diff --git a/client_test.go b/client_test.go index 01be1d81..a5923cb8 100644 --- a/client_test.go +++ b/client_test.go @@ -63,7 +63,7 @@ type noOpWorker struct { WorkerDefaults[noOpArgs] } -func (w *noOpWorker) Work(ctx context.Context, j *Job[noOpArgs]) error { return nil } +func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil } type periodicJobArgs struct{} @@ -73,18 +73,18 @@ type periodicJobWorker struct { WorkerDefaults[periodicJobArgs] } -func (w *periodicJobWorker) Work(ctx context.Context, j *Job[periodicJobArgs]) error { +func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs]) error { return nil } type callbackFunc func(context.Context, *Job[callbackArgs]) error func makeAwaitCallback(startedCh chan<- int64, doneCh chan struct{}) callbackFunc { - return func(ctx context.Context, j *Job[callbackArgs]) error { + return func(ctx context.Context, job *Job[callbackArgs]) error { select { case <-ctx.Done(): return ctx.Err() - case startedCh <- j.ID: + case startedCh <- job.ID: } // await done signal, or context cancellation: @@ -108,8 +108,8 @@ type callbackWorker struct { fn callbackFunc } -func (w *callbackWorker) Work(ctx context.Context, j *Job[callbackArgs]) error { - return w.fn(ctx, j) +func (w *callbackWorker) Work(ctx context.Context, job *Job[callbackArgs]) error { + return w.fn(ctx, job) } func newTestConfig(t *testing.T, callback callbackFunc) *Config { @@ -397,11 +397,11 @@ func Test_Client_Stop(t *testing.T) { jobDoneChan := make(chan struct{}) jobStartedChan := make(chan int64) - callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { + callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { select { case <-ctx.Done(): return ctx.Err() - case jobStartedChan <- j.ID: + case jobStartedChan <- job.ID: } select { @@ -446,9 +446,9 @@ func Test_Client_Stop(t *testing.T) { t.Parallel() startedCh := make(chan int64) - callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { + callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { select { - case startedCh <- j.ID: + case startedCh <- job.ID: default: } return nil @@ -473,7 +473,7 @@ func Test_Client_Stop(t *testing.T) { t.Run("WithSubscriber", func(t *testing.T) { t.Parallel() - callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { return nil } + callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { return nil } client := runNewTestClient(ctx, t, newTestConfig(t, callbackFunc)) @@ -503,14 +503,14 @@ func Test_Client_StopAndCancel(t *testing.T) { jobDoneChan := make(chan struct{}) jobStartedChan := make(chan int64) - callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { + callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { defer close(jobDoneChan) // indicate the job has started, unless context is already done: select { case <-ctx.Done(): return ctx.Err() - case jobStartedChan <- j.ID: + case jobStartedChan <- job.ID: } t.Logf("Job waiting for context cancellation") @@ -571,12 +571,12 @@ type callbackWorkerWithCustomTimeout struct { fn func(context.Context, *Job[callbackWithCustomTimeoutArgs]) error } -func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error { - return w.fn(ctx, j) +func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error { + return w.fn(ctx, job) } -func (w *callbackWorkerWithCustomTimeout) Timeout(j *Job[callbackWithCustomTimeoutArgs]) time.Duration { - return j.Args.TimeoutValue +func (w *callbackWorkerWithCustomTimeout) Timeout(job *Job[callbackWithCustomTimeoutArgs]) time.Duration { + return job.Args.TimeoutValue } func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) { @@ -589,7 +589,7 @@ func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) { doneCh := make(chan struct{}) close(doneCh) - callbackFunc := func(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error { + callbackFunc := func(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error { // indicate the job has started, unless context is already done: select { case <-ctx.Done(): @@ -1073,7 +1073,7 @@ func Test_Client_ErrorHandler(t *testing.T) { t.Parallel() handlerErr := fmt.Errorf("job error") - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return handlerErr }) @@ -1127,7 +1127,7 @@ func Test_Client_ErrorHandler(t *testing.T) { t.Run("PanicHandler", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { panic("panic val") }) @@ -1459,7 +1459,7 @@ func Test_Client_RetryPolicy(t *testing.T) { t.Run("RetryUntilDiscarded", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return fmt.Errorf("job error") }) @@ -1583,8 +1583,8 @@ func Test_Client_Subscribe(t *testing.T) { // Fail/succeed jobs based on their name so we can get a mix of both to // verify. - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { - if strings.HasPrefix(j.Args.Name, "failed") { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + if strings.HasPrefix(job.Args.Name, "failed") { return fmt.Errorf("job error") } return nil @@ -1649,8 +1649,8 @@ func Test_Client_Subscribe(t *testing.T) { t.Run("CompletedOnly", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { - if strings.HasPrefix(j.Args.Name, "failed") { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + if strings.HasPrefix(job.Args.Name, "failed") { return fmt.Errorf("job error") } return nil @@ -1690,8 +1690,8 @@ func Test_Client_Subscribe(t *testing.T) { t.Run("FailedOnly", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { - if strings.HasPrefix(j.Args.Name, "failed") { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + if strings.HasPrefix(job.Args.Name, "failed") { return fmt.Errorf("job error") } return nil @@ -1731,7 +1731,7 @@ func Test_Client_Subscribe(t *testing.T) { t.Run("EventsDropWithNoListeners", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return nil }) @@ -1775,7 +1775,7 @@ func Test_Client_Subscribe(t *testing.T) { t.Run("PanicOnUnknownKind", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return nil }) @@ -1789,7 +1789,7 @@ func Test_Client_Subscribe(t *testing.T) { t.Run("SubscriptionCancellation", func(t *testing.T) { t.Parallel() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return nil }) @@ -1883,7 +1883,7 @@ func Test_Client_JobCompletion(t *testing.T) { t.Parallel() require := require.New(t) - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return nil }) @@ -1909,8 +1909,8 @@ func Test_Client_JobCompletion(t *testing.T) { require := require.New(t) var dbPool *pgxpool.Pool now := time.Now().UTC() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { - _, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: j.ID, FinalizedAt: now}) + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + _, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now}) require.NoError(err) return nil }) @@ -1936,7 +1936,7 @@ func Test_Client_JobCompletion(t *testing.T) { t.Parallel() require := require.New(t) - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return errors.New("oops") }) @@ -1961,7 +1961,7 @@ func Test_Client_JobCompletion(t *testing.T) { t.Parallel() require := require.New(t) - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { return JobCancel(errors.New("oops")) }) @@ -1988,9 +1988,9 @@ func Test_Client_JobCompletion(t *testing.T) { require := require.New(t) var dbPool *pgxpool.Pool now := time.Now().UTC() - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { _, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{ - ID: j.ID, + ID: job.ID, Error: []byte("{\"error\": \"oops\"}"), FinalizedAt: now, }) @@ -2023,11 +2023,11 @@ func Test_Client_JobCompletion(t *testing.T) { now := time.Now().UTC() var updatedJob *Job[callbackArgs] - config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error { + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { tx, err := dbPool.Begin(ctx) require.NoError(err) - updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, j) + updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) require.NoError(err) return tx.Commit(ctx) @@ -2150,8 +2150,8 @@ func Test_NewClient_ClientIDWrittenToJobAttemptedByWhenFetched(t *testing.T) { doneCh := make(chan struct{}) startedCh := make(chan *Job[callbackArgs]) - callback := func(ctx context.Context, j *Job[callbackArgs]) error { - startedCh <- j + callback := func(ctx context.Context, job *Job[callbackArgs]) error { + startedCh <- job <-doneCh return nil } @@ -2529,8 +2529,8 @@ type timeoutTestWorker struct { doneCh chan testWorkerDeadline } -func (w *timeoutTestWorker) Timeout(j *Job[timeoutTestArgs]) time.Duration { - return j.Args.TimeoutValue +func (w *timeoutTestWorker) Timeout(job *Job[timeoutTestArgs]) time.Duration { + return job.Args.TimeoutValue } func (w *timeoutTestWorker) Work(ctx context.Context, job *Job[timeoutTestArgs]) error { diff --git a/example_error_handler_test.go b/example_error_handler_test.go index 070588a0..c165af25 100644 --- a/example_error_handler_test.go +++ b/example_error_handler_test.go @@ -45,11 +45,11 @@ type ErroringWorker struct { river.WorkerDefaults[ErroringArgs] } -func (w *ErroringWorker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error { +func (w *ErroringWorker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error { switch { - case j.Args.ShouldError: + case job.Args.ShouldError: return fmt.Errorf("this job errored") - case j.Args.ShouldPanic: + case job.Args.ShouldPanic: panic("this job panicked") } return nil diff --git a/example_job_cancel_test.go b/example_job_cancel_test.go index c808f64e..36f5a97e 100644 --- a/example_job_cancel_test.go +++ b/example_job_cancel_test.go @@ -23,8 +23,8 @@ type CancellingWorker struct { river.WorkerDefaults[CancellingArgs] } -func (w *CancellingWorker) Work(ctx context.Context, j *river.Job[CancellingArgs]) error { - if j.Args.ShouldCancel { +func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error { + if job.Args.ShouldCancel { fmt.Println("cancelling job") return river.JobCancel(fmt.Errorf("this wrapped error message will be persisted to DB")) } diff --git a/example_job_snooze_test.go b/example_job_snooze_test.go index 6087188f..ce5d8e58 100644 --- a/example_job_snooze_test.go +++ b/example_job_snooze_test.go @@ -24,8 +24,8 @@ type SnoozingWorker struct { river.WorkerDefaults[SnoozingArgs] } -func (w *SnoozingWorker) Work(ctx context.Context, j *river.Job[SnoozingArgs]) error { - if j.Args.ShouldSnooze { +func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) error { + if job.Args.ShouldSnooze { fmt.Println("snoozing job for 5 minutes") return river.JobSnooze(5 * time.Minute) } diff --git a/example_work_func_test.go b/example_work_func_test.go index c76b98cf..989a1bb4 100644 --- a/example_work_func_test.go +++ b/example_work_func_test.go @@ -37,8 +37,8 @@ func Example_workFunc() { } workers := river.NewWorkers() - river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error { - fmt.Printf("Message: %s", j.Args.Message) + river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error { + fmt.Printf("Message: %s", job.Args.Message) return nil })) diff --git a/internal/cmd/producersample/main.go b/internal/cmd/producersample/main.go index 3de80a2b..bda0edba 100644 --- a/internal/cmd/producersample/main.go +++ b/internal/cmd/producersample/main.go @@ -214,10 +214,10 @@ type MyJobWorker struct { logger *slog.Logger } -func (w *MyJobWorker) Work(ctx context.Context, j *river.Job[MyJobArgs]) error { +func (w *MyJobWorker) Work(ctx context.Context, job *river.Job[MyJobArgs]) error { atomic.AddUint64(&w.jobsPerformed, 1) - // fmt.Printf("performing job %d with args %+v\n", j.ID, j.Args) - if j.ID%1000 == 0 { + // fmt.Printf("performing job %d with args %+v\n", job.ID, job.Args) + if job.ID%1000 == 0 { w.logger.Info("simulating stalled job, blocked on ctx.Done()") <-ctx.Done() w.logger.Info("stalled job exiting") diff --git a/job_args_reflect_kind_test.go b/job_args_reflect_kind_test.go index c1076383..5218c40a 100644 --- a/job_args_reflect_kind_test.go +++ b/job_args_reflect_kind_test.go @@ -13,7 +13,7 @@ import "reflect" // Message `json:"message"` // } // -// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[WorkFuncArgs]) error { +// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[WorkFuncArgs]) error { // ... // // Its major downside compared to a normal JobArgs implementation is that it's diff --git a/job_executor_test.go b/job_executor_test.go index 5f2ccaab..94ed0d2d 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -35,7 +35,7 @@ func (w *customRetryPolicyWorker) NextRetry(job *Job[callbackArgs]) time.Time { return time.Time{} } -func (w *customRetryPolicyWorker) Work(ctx context.Context, j *Job[callbackArgs]) error { +func (w *customRetryPolicyWorker) Work(ctx context.Context, job *Job[callbackArgs]) error { return w.f() } diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index 3cd2e8c0..896e9bdb 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -29,7 +29,7 @@ type Job1Worker struct { river.WorkerDefaults[Job1Args] } -func (w *Job1Worker) Work(ctx context.Context, j *river.Job[Job1Args]) error { return nil } +func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil } type Job2Args struct { Int int `json:"int"` @@ -41,7 +41,7 @@ type Job2Worker struct { river.WorkerDefaults[Job2Args] } -func (w *Job2Worker) Work(ctx context.Context, j *river.Job[Job2Args]) error { return nil } +func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil } // The tests for this function are quite minimal because it uses the same // implementation as the `*Tx` variant, so most of the test happens below. diff --git a/worker.go b/worker.go index 185cf9c0..73714730 100644 --- a/worker.go +++ b/worker.go @@ -172,8 +172,8 @@ func (wf *workFunc[T]) Work(ctx context.Context, job *Job[T]) error { // // For example: // -// river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error { -// fmt.Printf("Message: %s", j.Args.Message) +// river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error { +// fmt.Printf("Message: %s", job.Args.Message) // return nil // })) func WorkFunc[T JobArgs](f func(context.Context, *Job[T]) error) Worker[T] { diff --git a/worker_test.go b/worker_test.go index 0ae938dd..bb08f8e7 100644 --- a/worker_test.go +++ b/worker_test.go @@ -22,7 +22,7 @@ func TestWork(t *testing.T) { AddWorker(workers, &noOpWorker{}) }) - fn := func(ctx context.Context, j *Job[callbackArgs]) error { return nil } + fn := func(ctx context.Context, job *Job[callbackArgs]) error { return nil } ch := callbackWorker{fn: fn} // function worker @@ -43,7 +43,7 @@ type configurableWorker struct { WorkerDefaults[configurableArgs] } -func (w *configurableWorker) Work(ctx context.Context, j *Job[configurableArgs]) error { +func (w *configurableWorker) Work(ctx context.Context, job *Job[configurableArgs]) error { return nil } @@ -71,7 +71,7 @@ type StructWithFunc struct { WorkChan chan struct{} } -func (s *StructWithFunc) Work(ctx context.Context, j *Job[WorkFuncArgs]) error { +func (s *StructWithFunc) Work(ctx context.Context, job *Job[WorkFuncArgs]) error { s.WorkChan <- struct{}{} return nil } @@ -98,7 +98,7 @@ func TestWorkFunc(t *testing.T) { client, _ := setup(t) workChan := make(chan struct{}) - AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[WorkFuncArgs]) error { + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[WorkFuncArgs]) error { workChan <- struct{}{} return nil })) @@ -136,7 +136,7 @@ func TestWorkFunc(t *testing.T) { } workChan := make(chan struct{}) - AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[InFuncWorkFuncArgs]) error { + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[InFuncWorkFuncArgs]) error { workChan <- struct{}{} return nil }))