Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently prefer job param name to j #69

Merged
merged 1 commit into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 43 additions & 43 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type noOpWorker struct {
WorkerDefaults[noOpArgs]
}

func (w *noOpWorker) Work(ctx context.Context, j *Job[noOpArgs]) error { return nil }
func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil }

type periodicJobArgs struct{}

Expand All @@ -73,18 +73,18 @@ type periodicJobWorker struct {
WorkerDefaults[periodicJobArgs]
}

func (w *periodicJobWorker) Work(ctx context.Context, j *Job[periodicJobArgs]) error {
func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs]) error {
return nil
}

type callbackFunc func(context.Context, *Job[callbackArgs]) error

func makeAwaitCallback(startedCh chan<- int64, doneCh chan struct{}) callbackFunc {
return func(ctx context.Context, j *Job[callbackArgs]) error {
return func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case startedCh <- j.ID:
case startedCh <- job.ID:
}

// await done signal, or context cancellation:
Expand All @@ -108,8 +108,8 @@ type callbackWorker struct {
fn callbackFunc
}

func (w *callbackWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
return w.fn(ctx, j)
func (w *callbackWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
return w.fn(ctx, job)
}

func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Expand Down Expand Up @@ -397,11 +397,11 @@ func Test_Client_Stop(t *testing.T) {
jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- j.ID:
case jobStartedChan <- job.ID:
}

select {
Expand Down Expand Up @@ -446,9 +446,9 @@ func Test_Client_Stop(t *testing.T) {
t.Parallel()

startedCh := make(chan int64)
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
select {
case startedCh <- j.ID:
case startedCh <- job.ID:
default:
}
return nil
Expand All @@ -473,7 +473,7 @@ func Test_Client_Stop(t *testing.T) {
t.Run("WithSubscriber", func(t *testing.T) {
t.Parallel()

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { return nil }
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { return nil }

client := runNewTestClient(ctx, t, newTestConfig(t, callbackFunc))

Expand Down Expand Up @@ -503,14 +503,14 @@ func Test_Client_StopAndCancel(t *testing.T) {
jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)

callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
defer close(jobDoneChan)

// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- j.ID:
case jobStartedChan <- job.ID:
}

t.Logf("Job waiting for context cancellation")
Expand Down Expand Up @@ -571,12 +571,12 @@ type callbackWorkerWithCustomTimeout struct {
fn func(context.Context, *Job[callbackWithCustomTimeoutArgs]) error
}

func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
return w.fn(ctx, j)
func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
return w.fn(ctx, job)
}

func (w *callbackWorkerWithCustomTimeout) Timeout(j *Job[callbackWithCustomTimeoutArgs]) time.Duration {
return j.Args.TimeoutValue
func (w *callbackWorkerWithCustomTimeout) Timeout(job *Job[callbackWithCustomTimeoutArgs]) time.Duration {
return job.Args.TimeoutValue
}

func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
Expand All @@ -589,7 +589,7 @@ func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
doneCh := make(chan struct{})
close(doneCh)

callbackFunc := func(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
callbackFunc := func(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
t.Parallel()

handlerErr := fmt.Errorf("job error")
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return handlerErr
})

Expand Down Expand Up @@ -1127,7 +1127,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
t.Run("PanicHandler", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
panic("panic val")
})

Expand Down Expand Up @@ -1459,7 +1459,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
t.Run("RetryUntilDiscarded", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return fmt.Errorf("job error")
})

Expand Down Expand Up @@ -1583,8 +1583,8 @@ func Test_Client_Subscribe(t *testing.T) {

// Fail/succeed jobs based on their name so we can get a mix of both to
// verify.
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1649,8 +1649,8 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("CompletedOnly", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1690,8 +1690,8 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("FailedOnly", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
if strings.HasPrefix(j.Args.Name, "failed") {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return fmt.Errorf("job error")
}
return nil
Expand Down Expand Up @@ -1731,7 +1731,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("EventsDropWithNoListeners", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand Down Expand Up @@ -1775,7 +1775,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("PanicOnUnknownKind", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand All @@ -1789,7 +1789,7 @@ func Test_Client_Subscribe(t *testing.T) {
t.Run("SubscriptionCancellation", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand Down Expand Up @@ -1883,7 +1883,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

Expand All @@ -1909,8 +1909,8 @@ func Test_Client_JobCompletion(t *testing.T) {
require := require.New(t)
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: j.ID, FinalizedAt: now})
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now})
require.NoError(err)
return nil
})
Expand All @@ -1936,7 +1936,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return errors.New("oops")
})

Expand All @@ -1961,7 +1961,7 @@ func Test_Client_JobCompletion(t *testing.T) {
t.Parallel()

require := require.New(t)
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return JobCancel(errors.New("oops"))
})

Expand All @@ -1988,9 +1988,9 @@ func Test_Client_JobCompletion(t *testing.T) {
require := require.New(t)
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{
ID: j.ID,
ID: job.ID,
Error: []byte("{\"error\": \"oops\"}"),
FinalizedAt: now,
})
Expand Down Expand Up @@ -2023,11 +2023,11 @@ func Test_Client_JobCompletion(t *testing.T) {
now := time.Now().UTC()
var updatedJob *Job[callbackArgs]

config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
tx, err := dbPool.Begin(ctx)
require.NoError(err)

updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, j)
updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
require.NoError(err)

return tx.Commit(ctx)
Expand Down Expand Up @@ -2150,8 +2150,8 @@ func Test_NewClient_ClientIDWrittenToJobAttemptedByWhenFetched(t *testing.T) {
doneCh := make(chan struct{})
startedCh := make(chan *Job[callbackArgs])

callback := func(ctx context.Context, j *Job[callbackArgs]) error {
startedCh <- j
callback := func(ctx context.Context, job *Job[callbackArgs]) error {
startedCh <- job
<-doneCh
return nil
}
Expand Down Expand Up @@ -2529,8 +2529,8 @@ type timeoutTestWorker struct {
doneCh chan testWorkerDeadline
}

func (w *timeoutTestWorker) Timeout(j *Job[timeoutTestArgs]) time.Duration {
return j.Args.TimeoutValue
func (w *timeoutTestWorker) Timeout(job *Job[timeoutTestArgs]) time.Duration {
return job.Args.TimeoutValue
}

func (w *timeoutTestWorker) Work(ctx context.Context, job *Job[timeoutTestArgs]) error {
Expand Down
6 changes: 3 additions & 3 deletions example_error_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type ErroringWorker struct {
river.WorkerDefaults[ErroringArgs]
}

func (w *ErroringWorker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error {
func (w *ErroringWorker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error {
switch {
case j.Args.ShouldError:
case job.Args.ShouldError:
return fmt.Errorf("this job errored")
case j.Args.ShouldPanic:
case job.Args.ShouldPanic:
panic("this job panicked")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions example_job_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type CancellingWorker struct {
river.WorkerDefaults[CancellingArgs]
}

func (w *CancellingWorker) Work(ctx context.Context, j *river.Job[CancellingArgs]) error {
if j.Args.ShouldCancel {
func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
if job.Args.ShouldCancel {
fmt.Println("cancelling job")
return river.JobCancel(fmt.Errorf("this wrapped error message will be persisted to DB"))
}
Expand Down
4 changes: 2 additions & 2 deletions example_job_snooze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type SnoozingWorker struct {
river.WorkerDefaults[SnoozingArgs]
}

func (w *SnoozingWorker) Work(ctx context.Context, j *river.Job[SnoozingArgs]) error {
if j.Args.ShouldSnooze {
func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) error {
if job.Args.ShouldSnooze {
fmt.Println("snoozing job for 5 minutes")
return river.JobSnooze(5 * time.Minute)
}
Expand Down
4 changes: 2 additions & 2 deletions example_work_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func Example_workFunc() {
}

workers := river.NewWorkers()
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error {
fmt.Printf("Message: %s", j.Args.Message)
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
fmt.Printf("Message: %s", job.Args.Message)
return nil
}))

Expand Down
6 changes: 3 additions & 3 deletions internal/cmd/producersample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ type MyJobWorker struct {
logger *slog.Logger
}

func (w *MyJobWorker) Work(ctx context.Context, j *river.Job[MyJobArgs]) error {
func (w *MyJobWorker) Work(ctx context.Context, job *river.Job[MyJobArgs]) error {
atomic.AddUint64(&w.jobsPerformed, 1)
// fmt.Printf("performing job %d with args %+v\n", j.ID, j.Args)
if j.ID%1000 == 0 {
// fmt.Printf("performing job %d with args %+v\n", job.ID, job.Args)
if job.ID%1000 == 0 {
w.logger.Info("simulating stalled job, blocked on ctx.Done()")
<-ctx.Done()
w.logger.Info("stalled job exiting")
Expand Down
2 changes: 1 addition & 1 deletion job_args_reflect_kind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import "reflect"
// Message `json:"message"`
// }
//
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[WorkFuncArgs]) error {
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[WorkFuncArgs]) error {
// ...
//
// Its major downside compared to a normal JobArgs implementation is that it's
Expand Down
2 changes: 1 addition & 1 deletion job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (w *customRetryPolicyWorker) NextRetry(job *Job[callbackArgs]) time.Time {
return time.Time{}
}

func (w *customRetryPolicyWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
func (w *customRetryPolicyWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
return w.f()
}

Expand Down
4 changes: 2 additions & 2 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Job1Worker struct {
river.WorkerDefaults[Job1Args]
}

func (w *Job1Worker) Work(ctx context.Context, j *river.Job[Job1Args]) error { return nil }
func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil }

type Job2Args struct {
Int int `json:"int"`
Expand All @@ -41,7 +41,7 @@ type Job2Worker struct {
river.WorkerDefaults[Job2Args]
}

func (w *Job2Worker) Work(ctx context.Context, j *river.Job[Job2Args]) error { return nil }
func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil }

// The tests for this function are quite minimal because it uses the same
// implementation as the `*Tx` variant, so most of the test happens below.
Expand Down
Loading
Loading