Skip to content

Commit

Permalink
Fix running job cancellation when not fetching (riverqueue#678)
Browse files Browse the repository at this point in the history
* fix running job cancellation when not fetching

Cancellation of running jobs relied on a channel that was only being
received when in the job fetch routine, meaning that jobs which were
cancelled would not be cancelled until the next scheduled fetch.

This was fixed by also receiving from the job cancellation channel when
in the main producer loop, even if no fetches are happening.
  • Loading branch information
bgentry authored and tigrato committed Dec 18, 2024
1 parent 9d6cb3f commit 82988b6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678).

## [0.14.1] - 2024-11-04

### Fixed
Expand Down
22 changes: 18 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,12 @@ func Test_Client(t *testing.T) {
// _outside of_ a transaction. The exact same test logic applies to each case,
// the only difference is a different cancelFunc provided by the specific
// subtest.
cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
client, bundle := setup(t)
cancelRunningJobTestHelper := func(t *testing.T, config *Config, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
defaultConfig, bundle := setupConfig(t)
if config == nil {
config = defaultConfig
}
client := newTestClient(t, bundle.dbPool, config)

jobStartedChan := make(chan int64)

Expand Down Expand Up @@ -492,15 +496,25 @@ func Test_Client(t *testing.T) {
t.Run("CancelRunningJob", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobWithLongPollInterval", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
config.FetchPollInterval = 60 * time.Second
cancelRunningJobTestHelper(t, config, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobInTx", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
var (
job *rivertype.JobRow
err error
Expand Down
10 changes: 5 additions & 5 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw=
github.com/riverqueue/river v0.13.0-rc.1/go.mod h1:ZxTeoNZWNpwl+dCBWF5AomGV1exZbHu/E75ufb09HIo=
github.com/riverqueue/river v0.14.0/go.mod h1:R98qxNGrFOm1rtapS76Ef6y2WbQ56jtOc2kuVSKW/zA=
github.com/riverqueue/river/riverdriver v0.14.0/go.mod h1:DUayJJgiCWwfnsLC3sLBuM/N1cRh2lEoAohV6bHeaiA=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.0/go.mod h1:G6ymkGCy+H6SmRUTSBC9uXnk+dy4TttkuM5L1yS/KDA=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.0/go.mod h1:VlHbD3GF4ioT52J2S2VM2cFHbuG8D9u1bIbT4R/JuPE=
github.com/riverqueue/river/rivertype v0.14.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/riverqueue/river v0.14.1/go.mod h1:3cQREff7+iGZC+u2lire03SOxUmT41bjzpqZWAWPXtk=
github.com/riverqueue/river/riverdriver v0.14.1/go.mod h1:bJDNRwDNiCyXv3ZEfOGUvGBEo6C3fNnPc4VQRF1P+Ys=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.1/go.mod h1:C+A3pzwxMwyclSwfeTRyWoDRoFd9BhNmsSPSe8bv4l8=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.1/go.mod h1:P9rfgq0hgRM19ty6CHMQTAKUq3crmP28f4BINDfRCyw=
github.com/riverqueue/river/rivertype v0.14.1/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 2 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
default:
p.Logger.DebugContext(workCtx, p.Name+": Unknown queue control action", "action", msg.Action)
}
case jobID := <-p.cancelCh:
p.maybeCancelJob(jobID)
case <-fetchLimiter.C():
if p.paused {
continue
Expand Down

0 comments on commit 82988b6

Please sign in to comment.