Skip to content

Commit

Permalink
Consistently prefer job param name to j (#69)
Browse files Browse the repository at this point in the history
When using a job parameter name for a worker, we weren't consistent
about whether to use `j` or `job` and have mixed convention all over the
place:

    func (w *Worker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error {
    func (w *Worker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error {

Generally in non-core-team Go, a convention of more than single
character variable names is preferred except in specific cases (e.g.
`i`), so here, move everything over to use the more explicit `job`.
  • Loading branch information
brandur authored Nov 26, 2023
1 parent 4d141b3 commit 724cefc
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 66 deletions.
86 changes: 43 additions & 43 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type noOpWorker struct {
WorkerDefaults[noOpArgs]
}

func (w *noOpWorker) Work(ctx context.Context, j *Job[noOpArgs]) error { return nil }
func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil }

type periodicJobArgs struct{}

Expand All @@ -73,18 +73,18 @@ type periodicJobWorker struct {
WorkerDefaults[periodicJobArgs]
}

func (w *periodicJobWorker) Work(ctx context.Context, j *Job[periodicJobArgs]) error {
func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs]) error {
return nil
}

type callbackFunc func(context.Context, *Job[callbackArgs]) error

func makeAwaitCallback(startedCh chan<- int64, doneCh chan struct{}) callbackFunc {
return func(ctx context.Context, j *Job[callbackArgs]) error {
return func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case startedCh <- j.ID:
case startedCh <- job.ID:
}

// await done signal, or context cancellation:
Expand All @@ -108,8 +108,8 @@ type callbackWorker struct {
fn callbackFunc
}

func (w *callbackWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
return w.fn(ctx, j)
func (w *callbackWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
return w.fn(ctx, job)
}

func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Expand Down Expand Up @@ -397,11 +397,11 @@ func Test_Client_Stop(t *testing.T) {
jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- j.ID:
case jobStartedChan <- job.ID:
}

select {
Expand Down Expand Up @@ -446,9 +446,9 @@ func Test_Client_Stop(t *testing.T) {
t.Parallel()

startedCh := make(chan int64)
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case startedCh <- j.ID:
case startedCh <- job.ID:
default:
}
return nil
Expand All @@ -473,7 +473,7 @@ func Test_Client_Stop(t *testing.T) {
t.Run("WithSubscriber", func(t *testing.T) {
t.Parallel()

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { return nil }
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { return nil }

client := runNewTestClient(ctx, t, newTestConfig(t, callbackFunc))

Expand Down Expand Up @@ -503,14 +503,14 @@ func Test_Client_StopAndCancel(t *testing.T) {
jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
defer close(jobDoneChan)

// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- j.ID:
case jobStartedChan <- job.ID:
}

t.Logf("Job waiting for context cancellation")
Expand Down Expand Up @@ -571,12 +571,12 @@ type callbackWorkerWithCustomTimeout struct {
fn func(context.Context, *Job[callbackWithCustomTimeoutArgs]) error
}

func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
return w.fn(ctx, j)
func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
return w.fn(ctx, job)
}

func (w *callbackWorkerWithCustomTimeout) Timeout(j *Job[callbackWithCustomTimeoutArgs]) time.Duration {
return j.Args.TimeoutValue
func (w *callbackWorkerWithCustomTimeout) Timeout(job *Job[callbackWithCustomTimeoutArgs]) time.Duration {
return job.Args.TimeoutValue
}

func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
Expand All @@ -589,7 +589,7 @@ func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
doneCh := make(chan struct{})
close(doneCh)

callbackFunc := func(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
t.Parallel()

handlerErr := fmt.Errorf("job error")
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return handlerErr
})

Expand Down Expand Up @@ -1127,7 +1127,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
t.Run("PanicHandler", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
panic("panic val")
})

Expand Down Expand Up @@ -1459,7 +1459,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
t.Run("RetryUntilDiscarded", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return fmt.Errorf("job error")
})

Expand Down Expand Up @@ -1583,8 +1583,8 @@ func Test_Client_Subscribe(t *testing.T) {

// Fail/succeed jobs based on their name so we can get a mix of both to
// verify.
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1649,8 +1649,8 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("CompletedOnly", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1690,8 +1690,8 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("FailedOnly", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1731,7 +1731,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("EventsDropWithNoListeners", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand Down Expand Up @@ -1775,7 +1775,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("PanicOnUnknownKind", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand All @@ -1789,7 +1789,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("SubscriptionCancellation", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand Down Expand Up @@ -1883,7 +1883,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand All @@ -1909,8 +1909,8 @@ func Test_Client_JobCompletion(t *testing.T) {
require := require.New(t)
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: j.ID, FinalizedAt: now})
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now})
require.NoError(err)
return nil
})
Expand All @@ -1936,7 +1936,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return errors.New("oops")
})

Expand All @@ -1961,7 +1961,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return JobCancel(errors.New("oops"))
})

Expand All @@ -1988,9 +1988,9 @@ func Test_Client_JobCompletion(t *testing.T) {
require := require.New(t)
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{
ID: j.ID,
ID: job.ID,
Error: []byte("{\"error\": \"oops\"}"),
FinalizedAt: now,
})
Expand Down Expand Up @@ -2023,11 +2023,11 @@ func Test_Client_JobCompletion(t *testing.T) {
now := time.Now().UTC()
var updatedJob *Job[callbackArgs]

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
tx, err := dbPool.Begin(ctx)
require.NoError(err)

updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, j)
updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
require.NoError(err)

return tx.Commit(ctx)
Expand Down Expand Up @@ -2150,8 +2150,8 @@ func Test_NewClient_ClientIDWrittenToJobAttemptedByWhenFetched(t *testing.T) {
doneCh := make(chan struct{})
startedCh := make(chan *Job[callbackArgs])

callback := func(ctx context.Context, j *Job[callbackArgs]) error {
startedCh <- j
callback := func(ctx context.Context, job *Job[callbackArgs]) error {
startedCh <- job
<-doneCh
return nil
}
Expand Down Expand Up @@ -2529,8 +2529,8 @@ type timeoutTestWorker struct {
doneCh chan testWorkerDeadline
}

func (w *timeoutTestWorker) Timeout(j *Job[timeoutTestArgs]) time.Duration {
return j.Args.TimeoutValue
func (w *timeoutTestWorker) Timeout(job *Job[timeoutTestArgs]) time.Duration {
return job.Args.TimeoutValue
}

func (w *timeoutTestWorker) Work(ctx context.Context, job *Job[timeoutTestArgs]) error {
Expand Down
6 changes: 3 additions & 3 deletions example_error_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type ErroringWorker struct {
river.WorkerDefaults[ErroringArgs]
}

func (w *ErroringWorker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error {
func (w *ErroringWorker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error {
switch {
case j.Args.ShouldError:
case job.Args.ShouldError:
return fmt.Errorf("this job errored")
case j.Args.ShouldPanic:
case job.Args.ShouldPanic:
panic("this job panicked")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions example_job_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type CancellingWorker struct {
river.WorkerDefaults[CancellingArgs]
}

func (w *CancellingWorker) Work(ctx context.Context, j *river.Job[CancellingArgs]) error {
if j.Args.ShouldCancel {
func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
if job.Args.ShouldCancel {
fmt.Println("cancelling job")
return river.JobCancel(fmt.Errorf("this wrapped error message will be persisted to DB"))
}
Expand Down
4 changes: 2 additions & 2 deletions example_job_snooze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type SnoozingWorker struct {
river.WorkerDefaults[SnoozingArgs]
}

func (w *SnoozingWorker) Work(ctx context.Context, j *river.Job[SnoozingArgs]) error {
if j.Args.ShouldSnooze {
func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) error {
if job.Args.ShouldSnooze {
fmt.Println("snoozing job for 5 minutes")
return river.JobSnooze(5 * time.Minute)
}
Expand Down
4 changes: 2 additions & 2 deletions example_work_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func Example_workFunc() {
}

workers := river.NewWorkers()
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error {
fmt.Printf("Message: %s", j.Args.Message)
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
fmt.Printf("Message: %s", job.Args.Message)
return nil
}))

Expand Down
6 changes: 3 additions & 3 deletions internal/cmd/producersample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ type MyJobWorker struct {
logger *slog.Logger
}

func (w *MyJobWorker) Work(ctx context.Context, j *river.Job[MyJobArgs]) error {
func (w *MyJobWorker) Work(ctx context.Context, job *river.Job[MyJobArgs]) error {
atomic.AddUint64(&w.jobsPerformed, 1)
// fmt.Printf("performing job %d with args %+v\n", j.ID, j.Args)
if j.ID%1000 == 0 {
// fmt.Printf("performing job %d with args %+v\n", job.ID, job.Args)
if job.ID%1000 == 0 {
w.logger.Info("simulating stalled job, blocked on ctx.Done()")
<-ctx.Done()
w.logger.Info("stalled job exiting")
Expand Down
2 changes: 1 addition & 1 deletion job_args_reflect_kind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import "reflect"
// Message `json:"message"`
// }
//
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[WorkFuncArgs]) error {
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[WorkFuncArgs]) error {
// ...
//
// Its major downside compared to a normal JobArgs implementation is that it's
Expand Down
2 changes: 1 addition & 1 deletion job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (w *customRetryPolicyWorker) NextRetry(job *Job[callbackArgs]) time.Time {
return time.Time{}
}

func (w *customRetryPolicyWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
func (w *customRetryPolicyWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
return w.f()
}

Expand Down
4 changes: 2 additions & 2 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Job1Worker struct {
river.WorkerDefaults[Job1Args]
}

func (w *Job1Worker) Work(ctx context.Context, j *river.Job[Job1Args]) error { return nil }
func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil }

type Job2Args struct {
Int int `json:"int"`
Expand All @@ -41,7 +41,7 @@ type Job2Worker struct {
river.WorkerDefaults[Job2Args]
}

func (w *Job2Worker) Work(ctx context.Context, j *river.Job[Job2Args]) error { return nil }
func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil }

// The tests for this function are quite minimal because it uses the same
// implementation as the `*Tx` variant, so most of the test happens below.
Expand Down
Loading

0 comments on commit 724cefc

Please sign in to comment.