Skip to content

Commit

Permalink
add JobDelete / JobDeleteTx APIs to Client
Browse files Browse the repository at this point in the history
This adds new APIs to permanently delete a job from the job table, so
long as it is not currently running.
  • Loading branch information
bgentry committed Jun 12, 2024
1 parent cb43f2c commit 350947d
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383).
- Add `JobDelete` / `JobDeleteTx` APIs on `Client` to allow permanently deleting any job that's not currently running. [PR #390](https://github.com/riverqueue/river/pull/390).

### Fixed

Expand Down
17 changes: 17 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,23 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
})
}

// JobDelete deletes the job with the given ID from the database, returning the
// deleted row if it was deleted. Jobs in the running state are not deleted,
// instead returning rivertype.ErrJobRunning.
func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) {
return c.driver.GetExecutor().JobDelete(ctx, id)
}

// JobDelete deletes the job with the given ID from the database, returning the
// deleted row if it was deleted. Jobs in the running state are not deleted,
// instead returning rivertype.ErrJobRunning. This variant lets a caller retry a
// job atomically alongside other database changes. A deleted job isn't deleted
// until the transaction commits, and if the transaction rolls back, so too is
// the deleted job.
func (c *Client[TTx]) JobDeleteTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
return c.driver.UnwrapExecutor(tx).JobDelete(ctx, id)
}

// JobGet fetches a single job by its ID. Returns the up-to-date JobRow for the
// specified jobID if it exists. Returns ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow, error) {
Expand Down
108 changes: 108 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,114 @@ func Test_Client_ClientFromContext(t *testing.T) {
require.Equal(t, client, clientResult)
}

func Test_Client_JobDelete(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)
config := newTestConfig(t, nil)
client := newTestClient(t, dbPool, config)

return client, &testBundle{dbPool: dbPool}
}

t.Run("DeletesANonRunningJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Hour)})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateScheduled, insertRes.Job.State)

jobAfter, err := client.JobDelete(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.NotNil(t, jobAfter)
require.Equal(t, rivertype.JobStateScheduled, jobAfter.State)

_, err = client.JobGet(ctx, insertRes.Job.ID)
require.ErrorIs(t, err, ErrNotFound)
})

t.Run("DoesNotDeleteARunningJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

doneCh := make(chan struct{})
startedCh := make(chan int64)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error {
close(startedCh)
<-doneCh
return nil
}))

require.NoError(t, client.Start(ctx))
t.Cleanup(func() { require.NoError(t, client.Stop(ctx)) })
t.Cleanup(func() { close(doneCh) }) // must close before stopping client

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
require.NoError(t, err)

// Wait for the job to start:
riverinternaltest.WaitOrTimeout(t, startedCh)

jobAfter, err := client.JobDelete(ctx, insertRes.Job.ID)
require.ErrorIs(t, err, rivertype.ErrJobRunning)
require.Nil(t, jobAfter)

jobFromGet, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateRunning, jobFromGet.State)
})

t.Run("TxVariantAlsoDeletesANonRunningJob", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Hour)})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateScheduled, insertRes.Job.State)

var jobAfter *rivertype.JobRow

err = pgx.BeginFunc(ctx, bundle.dbPool, func(tx pgx.Tx) error {
var err error
jobAfter, err = client.JobDeleteTx(ctx, tx, insertRes.Job.ID)
return err
})
require.NoError(t, err)
require.NotNil(t, jobAfter)
require.Equal(t, insertRes.Job.ID, jobAfter.ID)
require.Equal(t, rivertype.JobStateScheduled, jobAfter.State)

jobFromGet, err := client.JobGet(ctx, insertRes.Job.ID)
require.ErrorIs(t, ErrNotFound, err)
require.Nil(t, jobFromGet)
})

t.Run("ReturnsErrNotFoundIfJobDoesNotExist", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

jobAfter, err := client.JobDelete(ctx, 0)
require.Error(t, err)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)
})
}

func Test_Client_Insert(t *testing.T) {
t.Parallel()

Expand Down
80 changes: 79 additions & 1 deletion internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,84 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.Equal(t, 2, numJobs)
})

t.Run("JobDelete", func(t *testing.T) {
t.Parallel()

t.Run("DoesNotDeleteARunningJob", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
State: ptrutil.Ptr(rivertype.JobStateRunning),
})

jobAfter, err := exec.JobDelete(ctx, job.ID)
require.ErrorIs(t, err, rivertype.ErrJobRunning)
require.Nil(t, jobAfter)

jobUpdated, err := exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateRunning, jobUpdated.State)
})

for _, state := range []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStateCancelled,
rivertype.JobStateCompleted,
rivertype.JobStateDiscarded,
rivertype.JobStatePending,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
} {
state := state

t.Run(fmt.Sprintf("DeletesA_%s_Job", state), func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

now := time.Now().UTC()

setFinalized := slices.Contains([]rivertype.JobState{
rivertype.JobStateCancelled,
rivertype.JobStateCompleted,
rivertype.JobStateDiscarded,
}, state)

var finalizedAt *time.Time
if setFinalized {
finalizedAt = &now
}

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
FinalizedAt: finalizedAt,
ScheduledAt: ptrutil.Ptr(now.Add(1 * time.Hour)),
State: &state,
})

jobAfter, err := exec.JobDelete(ctx, job.ID)
require.NoError(t, err)
require.NotNil(t, jobAfter)
require.Equal(t, job.ID, jobAfter.ID)
require.Equal(t, state, jobAfter.State)

_, err = exec.JobGetByID(ctx, job.ID)
require.ErrorIs(t, err, rivertype.ErrNotFound)
})
}

t.Run("ReturnsErrNotFoundIfJobDoesNotExist", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

jobAfter, err := exec.JobDelete(ctx, 1234567890)
require.ErrorIs(t, err, rivertype.ErrNotFound)
require.Nil(t, jobAfter)
})
})

t.Run("JobDeleteBefore", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1007,7 +1085,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
rivertype.JobStateCancelled,
rivertype.JobStateCompleted,
rivertype.JobStateDiscarded,
// TODO(bgentry): add Pending to this list when it's added:
rivertype.JobStatePending,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
} {
Expand Down
1 change: 1 addition & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Executor interface {

JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error)
JobCountByState(ctx context.Context, state rivertype.JobState) (int, error)
JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error)
JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error)
JobGetAvailable(ctx context.Context, params *JobGetAvailableParams) ([]*rivertype.JobRow, error)
JobGetByID(ctx context.Context, id int64) (*rivertype.JobRow, error)
Expand Down
49 changes: 49 additions & 0 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (e *Executor) JobCountByState(ctx context.Context, state rivertype.JobState
return 0, riverdriver.ErrNotImplemented
}

func (e *Executor) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) {
return nil, riverdriver.ErrNotImplemented
}

func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) {
return 0, riverdriver.ErrNotImplemented
}
Expand Down
24 changes: 24 additions & 0 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ SELECT count(*)
FROM river_job
WHERE state = @state;

-- name: JobDelete :one
WITH job_to_delete AS (
SELECT id
FROM river_job
WHERE river_job.id = @id
FOR UPDATE
),
deleted_job AS (
DELETE
FROM river_job
USING job_to_delete
WHERE river_job.id = job_to_delete.id
-- Do not touch running jobs:
AND river_job.state != 'running'::river_job_state
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id = @id::bigint
AND id NOT IN (SELECT id FROM deleted_job)
UNION
SELECT *
FROM deleted_job;

-- name: JobDeleteBefore :one
WITH deleted_jobs AS (
DELETE FROM river_job
Expand Down
Loading

0 comments on commit 350947d

Please sign in to comment.