diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ed9a5e3..7febb715 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383). +- Add `JobDelete` / `JobDeleteTx` APIs on `Client` to allow permanently deleting any job that's not currently running. [PR #390](https://github.com/riverqueue/river/pull/390). ### Fixed diff --git a/client.go b/client.go index 700298b8..7a37ddb5 100644 --- a/client.go +++ b/client.go @@ -1047,6 +1047,22 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, }) } +// JobDelete deletes the job with the given ID from the database, returning +// whether or not the job was deleted along with a possible error. Jobs in the +// running state are not deleted. +func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) { + return c.driver.GetExecutor().JobDelete(ctx, id) +} + +// JobDeleteTx deletes the job with the given ID from the database, returning +// whether or not the job was deleted along with a possible error. This variant +// lets a caller retry a job atomically alongside other database changes. A +// deleted job isn't deleted until the transaction commits, and if the +// transaction rolls back, so too is the deleted job. +func (c *Client[TTx]) JobDeleteTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) { + return c.driver.UnwrapExecutor(tx).JobDelete(ctx, id) +} + // JobGet fetches a single job by its ID. Returns the up-to-date JobRow for the // specified jobID if it exists. Returns ErrNotFound if the job doesn't exist. func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow, error) { diff --git a/client_test.go b/client_test.go index 754e17bb..9b9a2605 100644 --- a/client_test.go +++ b/client_test.go @@ -1010,6 +1010,114 @@ func Test_Client_ClientFromContext(t *testing.T) { require.Equal(t, client, clientResult) } +func Test_Client_JobDelete(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + return client, &testBundle{dbPool: dbPool} + } + + t.Run("DeletesANonRunningJob", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Hour)}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateScheduled, insertRes.Job.State) + + jobAfter, err := client.JobDelete(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.NotNil(t, jobAfter) + require.Equal(t, rivertype.JobStateScheduled, jobAfter.State) + + _, err = client.JobGet(ctx, insertRes.Job.ID) + require.ErrorIs(t, err, ErrNotFound) + }) + + t.Run("DoesNotDeleteARunningJob", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + doneCh := make(chan struct{}) + startedCh := make(chan int64) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error { + close(startedCh) + <-doneCh + return nil + })) + + require.NoError(t, client.Start(ctx)) + t.Cleanup(func() { require.NoError(t, client.Stop(ctx)) }) + t.Cleanup(func() { close(doneCh) }) // must close before stopping client + + insertRes, err := client.Insert(ctx, callbackArgs{}, nil) + require.NoError(t, err) + + // Wait for the job to start: + riverinternaltest.WaitOrTimeout(t, startedCh) + + jobAfter, err := client.JobDelete(ctx, insertRes.Job.ID) + require.ErrorIs(t, err, rivertype.ErrJobRunning) + require.Nil(t, jobAfter) + + jobFromGet, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRunning, jobFromGet.State) + }) + + t.Run("TxVariantAlsoDeletesANonRunningJob", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Hour)}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateScheduled, insertRes.Job.State) + + var jobAfter *rivertype.JobRow + + err = pgx.BeginFunc(ctx, bundle.dbPool, func(tx pgx.Tx) error { + var err error + jobAfter, err = client.JobDeleteTx(ctx, tx, insertRes.Job.ID) + return err + }) + require.NoError(t, err) + require.NotNil(t, jobAfter) + require.Equal(t, insertRes.Job.ID, jobAfter.ID) + require.Equal(t, rivertype.JobStateScheduled, jobAfter.State) + + jobFromGet, err := client.JobGet(ctx, insertRes.Job.ID) + require.ErrorIs(t, ErrNotFound, err) + require.Nil(t, jobFromGet) + }) + + t.Run("ReturnsErrNotFoundIfJobDoesNotExist", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + jobAfter, err := client.JobDelete(ctx, 0) + require.Error(t, err) + require.ErrorIs(t, err, ErrNotFound) + require.Nil(t, jobAfter) + }) +} + func Test_Client_Insert(t *testing.T) { t.Parallel() diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 59343526..1e0b1343 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -191,6 +191,84 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.Equal(t, 2, numJobs) }) + t.Run("JobDelete", func(t *testing.T) { + t.Parallel() + + t.Run("DoesNotDeleteARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + jobAfter, err := exec.JobDelete(ctx, job.ID) + require.ErrorIs(t, err, rivertype.ErrJobRunning) + require.Nil(t, jobAfter) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRunning, jobUpdated.State) + }) + + for _, state := range []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStateCancelled, + rivertype.JobStateCompleted, + rivertype.JobStateDiscarded, + rivertype.JobStatePending, + rivertype.JobStateRetryable, + rivertype.JobStateScheduled, + } { + state := state + + t.Run(fmt.Sprintf("DeletesA_%s_Job", state), func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + now := time.Now().UTC() + + setFinalized := slices.Contains([]rivertype.JobState{ + rivertype.JobStateCancelled, + rivertype.JobStateCompleted, + rivertype.JobStateDiscarded, + }, state) + + var finalizedAt *time.Time + if setFinalized { + finalizedAt = &now + } + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + FinalizedAt: finalizedAt, + ScheduledAt: ptrutil.Ptr(now.Add(1 * time.Hour)), + State: &state, + }) + + jobAfter, err := exec.JobDelete(ctx, job.ID) + require.NoError(t, err) + require.NotNil(t, jobAfter) + require.Equal(t, job.ID, jobAfter.ID) + require.Equal(t, state, jobAfter.State) + + _, err = exec.JobGetByID(ctx, job.ID) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + } + + t.Run("ReturnsErrNotFoundIfJobDoesNotExist", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + jobAfter, err := exec.JobDelete(ctx, 1234567890) + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, jobAfter) + }) + }) + t.Run("JobDeleteBefore", func(t *testing.T) { t.Parallel() @@ -1007,7 +1085,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv rivertype.JobStateCancelled, rivertype.JobStateCompleted, rivertype.JobStateDiscarded, - // TODO(bgentry): add Pending to this list when it's added: + rivertype.JobStatePending, rivertype.JobStateRetryable, rivertype.JobStateScheduled, } { diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 61b70f00..511d2221 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -77,6 +77,7 @@ type Executor interface { JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByState(ctx context.Context, state rivertype.JobState) (int, error) + JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error) JobGetAvailable(ctx context.Context, params *JobGetAvailableParams) ([]*rivertype.JobRow, error) JobGetByID(ctx context.Context, id int64) (*rivertype.JobRow, error) diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index d52e4216..d309c3c6 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -100,6 +100,55 @@ func (q *Queries) JobCountByState(ctx context.Context, db DBTX, state JobState) return count, err } +const jobDelete = `-- name: JobDelete :one +WITH job_to_delete AS ( + SELECT id + FROM river_job + WHERE river_job.id = $1 + FOR UPDATE +), +deleted_job AS ( + DELETE + FROM river_job + USING job_to_delete + WHERE river_job.id = job_to_delete.id + -- Do not touch running jobs: + AND river_job.state != 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = $1::bigint + AND id NOT IN (SELECT id FROM deleted_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM deleted_job +` + +func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, error) { + row := db.QueryRowContext(ctx, jobDelete, id) + var i RiverJob + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + ) + return &i, err +} + const jobDeleteBefore = `-- name: JobDeleteBefore :one WITH deleted_jobs AS ( DELETE FROM river_job diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index 4308a9f0..4423fd33 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -77,6 +77,10 @@ func (e *Executor) JobCountByState(ctx context.Context, state rivertype.JobState return 0, riverdriver.ErrNotImplemented } +func (e *Executor) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) { + return nil, riverdriver.ErrNotImplemented +} + func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) { return 0, riverdriver.ErrNotImplemented } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 3b54426a..fd5b7a5f 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -83,6 +83,30 @@ SELECT count(*) FROM river_job WHERE state = @state; +-- name: JobDelete :one +WITH job_to_delete AS ( + SELECT id + FROM river_job + WHERE river_job.id = @id + FOR UPDATE +), +deleted_job AS ( + DELETE + FROM river_job + USING job_to_delete + WHERE river_job.id = job_to_delete.id + -- Do not touch running jobs: + AND river_job.state != 'running'::river_job_state + RETURNING river_job.* +) +SELECT * +FROM river_job +WHERE id = @id::bigint + AND id NOT IN (SELECT id FROM deleted_job) +UNION +SELECT * +FROM deleted_job; + -- name: JobDeleteBefore :one WITH deleted_jobs AS ( DELETE FROM river_job diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 47b16bff..a902750d 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -97,6 +97,55 @@ func (q *Queries) JobCountByState(ctx context.Context, db DBTX, state RiverJobSt return count, err } +const jobDelete = `-- name: JobDelete :one +WITH job_to_delete AS ( + SELECT id + FROM river_job + WHERE river_job.id = $1 + FOR UPDATE +), +deleted_job AS ( + DELETE + FROM river_job + USING job_to_delete + WHERE river_job.id = job_to_delete.id + -- Do not touch running jobs: + AND river_job.state != 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = $1::bigint + AND id NOT IN (SELECT id FROM deleted_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM deleted_job +` + +func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, error) { + row := db.QueryRow(ctx, jobDelete, id) + var i RiverJob + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + ) + return &i, err +} + const jobDeleteBefore = `-- name: JobDeleteBefore :one WITH deleted_jobs AS ( DELETE FROM river_job diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index d04200c8..79114b6f 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -99,6 +99,17 @@ func (e *Executor) JobCountByState(ctx context.Context, state rivertype.JobState return int(numJobs), nil } +func (e *Executor) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) { + job, err := e.queries.JobDelete(ctx, e.dbtx, id) + if err != nil { + return nil, interpretError(err) + } + if job.State == dbsqlc.RiverJobStateRunning { + return nil, rivertype.ErrJobRunning + } + return jobRowFromInternal(job), nil +} + func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) { numDeleted, err := e.queries.JobDeleteBefore(ctx, e.dbtx, &dbsqlc.JobDeleteBeforeParams{ CancelledFinalizedAtHorizon: params.CancelledFinalizedAtHorizon, diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 19a2b642..e9ffc906 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -13,6 +13,10 @@ import ( // return this error. var ErrNotFound = errors.New("not found") +// ErrJobRunning is returned when a job is attempted to be deleted while it's +// running. +var ErrJobRunning = errors.New("job is running") + // JobInsertResult is the result of a job insert, containing the inserted job // along with some other useful metadata. type JobInsertResult struct { diff --git a/subscription_manager_test.go b/subscription_manager_test.go index be005008..afe13f5b 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" "github.com/riverqueue/river/internal/riverinternaltest" @@ -15,7 +17,6 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivertype" - "github.com/stretchr/testify/require" ) func Test_SubscriptionManager(t *testing.T) {