Skip to content

Commit

Permalink
Idempotent queue pause and resume
Browse files Browse the repository at this point in the history
I was writing tests for River UI and found that when trying to pause an
already paused queue or resume an unpaused queue, River returns a "not
found" error.

This was quite surprising, so much so that it took me a good half hour
of debugging before I considered the fact that it might actually be a
problem in upstream River. Even if an argument were to be made that
pausing an already paused queue should be an error like "queue already
paused", returning "not found" is misleading and guaranteed to result
in confused people beyond just myself.

I think it's fine for pause and resume to be considered idempotent
operations. If a paused queue is paused again, there's no real damage
done, especially if we keep the original paused time, which we do in
this implementation.

Similarly, make an update so that when pausing or resuming using  the
all queues string (`*`), don't return "not found" error if there are no
queues in the database. Instead, just no-op.
  • Loading branch information
brandur committed Jun 29, 2024
1 parent 9a03e6f commit 52b6144
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408).

## [0.8.0] - 2024-06-25

### Added
Expand Down
61 changes: 55 additions & 6 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1949,7 +1949,25 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
t.Run("QueuePause", func(t *testing.T) {
t.Parallel()

t.Run("ExistingQueue", func(t *testing.T) {
t.Run("ExistingPausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{
PausedAt: ptrutil.Ptr(time.Now()),
})

require.NoError(t, exec.QueuePause(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.NotNil(t, queueFetched.PausedAt)
requireEqualTime(t, *queue.PausedAt, *queueFetched.PausedAt) // paused_at stays unchanged
requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged
})

t.Run("ExistingUnpausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -1974,7 +1992,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("AllQueues", func(t *testing.T) {
t.Run("AllQueuesExistingQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -1998,25 +2016,48 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.NotNil(t, queue2Fetched.PausedAt)
require.WithinDuration(t, now, *(queue2Fetched.PausedAt), 500*time.Millisecond)
})

t.Run("AllQueuesNoQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

require.NoError(t, exec.QueuePause(ctx, rivercommon.AllQueuesString))
})
})

t.Run("QueueResume", func(t *testing.T) {
t.Parallel()

t.Run("ExistingQueue", func(t *testing.T) {
t.Run("ExistingPausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{
PausedAt: ptrutil.Ptr(time.Now()),
})

require.NoError(t, exec.QueueResume(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.Nil(t, queueFetched.PausedAt)
})

t.Run("ExistingUnpausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, nil)
require.Nil(t, queue.PausedAt)

require.NoError(t, exec.QueuePause(ctx, queue.Name))
require.NoError(t, exec.QueueResume(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.Nil(t, queueFetched.PausedAt)
requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged
})

t.Run("NonExistentQueue", func(t *testing.T) {
Expand All @@ -2028,7 +2069,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("AllQueues", func(t *testing.T) {
t.Run("AllQueuesExistingQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -2049,6 +2090,14 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.NoError(t, err)
require.Nil(t, queue2Fetched.PausedAt)
})

t.Run("AllQueuesNoQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

require.NoError(t, exec.QueueResume(ctx, rivercommon.AllQueuesString))
})
})
})
}
Expand Down
2 changes: 2 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/riverqueue/river/rivertype"
)

const AllQueuesString = "*"

var (
ErrClosedPool = errors.New("underlying driver pool is closed")
ErrNotImplemented = errors.New("driver does not implement this functionality")
Expand Down
53 changes: 35 additions & 18 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,49 @@ LIMIT @limit_count::integer;

-- name: QueuePause :execresult
WITH queue_to_update AS (
SELECT name
SELECT name, paused_at
FROM river_queue
WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END
AND paused_at IS NULL
WHERE CASE WHEN @name::text = '*' THEN true ELSE name = @name END
FOR UPDATE
),
updated_queue AS (
UPDATE river_queue
SET
paused_at = now(),
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name
AND river_queue.paused_at IS NULL
RETURNING river_queue.*
)

UPDATE river_queue
SET
paused_at = now(),
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name;
SELECT *
FROM river_queue
WHERE name = @name
AND name NOT IN (SELECT name FROM updated_queue)
UNION
SELECT *
FROM updated_queue;

-- name: QueueResume :execresult
WITH queue_to_update AS (
SELECT name
FROM river_queue
WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END
AND paused_at IS NOT NULL
FOR UPDATE
),
updated_queue AS (
UPDATE river_queue
SET
paused_at = NULL,
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name
RETURNING river_queue.*
)

UPDATE river_queue
SET
paused_at = NULL,
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name;
SELECT *
FROM river_queue
WHERE name = @name
AND name NOT IN (SELECT name FROM updated_queue)
UNION
SELECT *
FROM updated_queue;
53 changes: 35 additions & 18 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (e *Executor) QueuePause(ctx context.Context, name string) error {
if err != nil {
return interpretError(err)
}
if res.RowsAffected() == 0 {
if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString {
return rivertype.ErrNotFound
}
return nil
Expand All @@ -535,7 +535,7 @@ func (e *Executor) QueueResume(ctx context.Context, name string) error {
if err != nil {
return interpretError(err)
}
if res.RowsAffected() == 0 {
if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString {
return rivertype.ErrNotFound
}
return nil
Expand Down

0 comments on commit 52b6144

Please sign in to comment.