Skip to content

Commit

Permalink
Derive all internal contexts from Client context
Browse files Browse the repository at this point in the history
We were previously using `context.Background()` in some situations to
avoid cancellation of things that really shouldn't be cancelled by the
user context cancellation. However, we can now do that with
`context.WithoutCancel` instead in order to preserve context values
without any cancellation or deadline.

Fixes #512.
  • Loading branch information
bgentry committed Aug 7, 2024
1 parent ab7f616 commit e500ef7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Derive all internal contexts from user-provided `Client` context. This includes the job fetch context, notifier unlisten, and completer. [PR #514](https://github.com/riverqueue/river/pull/514).

## [0.11.1] - 2024-08-05

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
const numRetries = 3

func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseService, disableSleep bool, retryFunc func(ctx context.Context) (T, error)) (T, error) {
uncancelledCtx := context.Background()
uncancelledCtx := context.WithoutCancel(logCtx)

var (
defaultVal T
Expand Down
4 changes: 2 additions & 2 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Subscription struct {

func (s *Subscription) Unlisten(ctx context.Context) {
s.unlistenOnce.Do(func() {
// Unlisten uses background context in case of cancellation.
if err := s.notifier.unlisten(context.Background(), s); err != nil { //nolint:contextcheck
// Unlisten strips cancellation from the parent context to ensure it runs:
if err := s.notifier.unlisten(context.WithoutCancel(ctx), s); err != nil { //nolint:contextcheck
s.notifier.Logger.ErrorContext(ctx, s.notifier.Name+": Error unlistening on topic", "err", err, "topic", s.topic)
}
})
Expand Down
19 changes: 10 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit

func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
limit := p.maxJobsToFetch()
go p.dispatchWork(limit, fetchResultCh) //nolint:contextcheck
go p.dispatchWork(workCtx, limit, fetchResultCh)

for {
select {
Expand Down Expand Up @@ -509,14 +509,15 @@ func (p *producer) maybeCancelJob(id int64) {
executor.Cancel()
}

func (p *producer) dispatchWork(count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally uses a background context because we don't want it to
// get cancelled if the producer is asked to shut down. In that situation, we
// want to finish fetching any jobs we are in the midst of fetching, work
// them, and then stop. Otherwise we'd have a risk of shutting down when we
// had already fetched jobs in the database, leaving those jobs stranded. We'd
// then potentially have to release them back to the queue.
jobs, err := p.exec.JobGetAvailable(context.Background(), &riverdriver.JobGetAvailableParams{
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancelleation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
// in the midst of fetching, work them, and then stop. Otherwise we'd have a
// risk of shutting down when we had already fetched jobs in the database,
// leaving those jobs stranded. We'd then potentially have to release them
// back to the queue.
jobs, err := p.exec.JobGetAvailable(context.WithoutCancel(workCtx), &riverdriver.JobGetAvailableParams{
AttemptedBy: p.config.ClientID,
Max: count,
Queue: p.config.Queue,
Expand Down

0 comments on commit e500ef7

Please sign in to comment.