diff --git a/CHANGELOG.md b/CHANGELOG.md index a02809f7..308cfad9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678). + ## [0.14.1] - 2024-11-04 ### Fixed diff --git a/client_test.go b/client_test.go index b0529845..8b47c4f2 100644 --- a/client_test.go +++ b/client_test.go @@ -445,8 +445,12 @@ func Test_Client(t *testing.T) { // _outside of_ a transaction. The exact same test logic applies to each case, // the only difference is a different cancelFunc provided by the specific // subtest. - cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper - client, bundle := setup(t) + cancelRunningJobTestHelper := func(t *testing.T, config *Config, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper + defaultConfig, bundle := setupConfig(t) + if config == nil { + config = defaultConfig + } + client := newTestClient(t, bundle.dbPool, config) jobStartedChan := make(chan int64) @@ -492,7 +496,17 @@ func Test_Client(t *testing.T) { t.Run("CancelRunningJob", func(t *testing.T) { t.Parallel() - cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + return client.JobCancel(ctx, jobID) + }) + }) + + t.Run("CancelRunningJobWithLongPollInterval", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + config.FetchPollInterval = 60 * time.Second + cancelRunningJobTestHelper(t, config, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { return client.JobCancel(ctx, jobID) }) }) @@ -500,7 +514,7 @@ func Test_Client(t *testing.T) { t.Run("CancelRunningJobInTx", func(t *testing.T) { t.Parallel() - cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { var ( job *rivertype.JobRow err error diff --git a/go.work.sum b/go.work.sum index eaf0f08b..8ff85faf 100644 --- a/go.work.sum +++ b/go.work.sum @@ -4,11 +4,11 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/riverqueue/river v0.13.0-rc.1/go.mod h1:ZxTeoNZWNpwl+dCBWF5AomGV1exZbHu/E75ufb09HIo= -github.com/riverqueue/river v0.14.0/go.mod h1:R98qxNGrFOm1rtapS76Ef6y2WbQ56jtOc2kuVSKW/zA= -github.com/riverqueue/river/riverdriver v0.14.0/go.mod h1:DUayJJgiCWwfnsLC3sLBuM/N1cRh2lEoAohV6bHeaiA= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.0/go.mod h1:G6ymkGCy+H6SmRUTSBC9uXnk+dy4TttkuM5L1yS/KDA= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.0/go.mod h1:VlHbD3GF4ioT52J2S2VM2cFHbuG8D9u1bIbT4R/JuPE= -github.com/riverqueue/river/rivertype v0.14.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0= +github.com/riverqueue/river v0.14.1/go.mod h1:3cQREff7+iGZC+u2lire03SOxUmT41bjzpqZWAWPXtk= +github.com/riverqueue/river/riverdriver v0.14.1/go.mod h1:bJDNRwDNiCyXv3ZEfOGUvGBEo6C3fNnPc4VQRF1P+Ys= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.1/go.mod h1:C+A3pzwxMwyclSwfeTRyWoDRoFd9BhNmsSPSe8bv4l8= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.1/go.mod h1:P9rfgq0hgRM19ty6CHMQTAKUq3crmP28f4BINDfRCyw= +github.com/riverqueue/river/rivertype v0.14.1/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/producer.go b/producer.go index b58fa505..0ad16ffe 100644 --- a/producer.go +++ b/producer.go @@ -448,6 +448,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit default: p.Logger.DebugContext(workCtx, p.Name+": Unknown queue control action", "action", msg.Action) } + case jobID := <-p.cancelCh: + p.maybeCancelJob(jobID) case <-fetchLimiter.C(): if p.paused { continue