Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,21 @@ func (s *Consumer) handle(m *job.Message) error {
}()

// run custom process function
if m.Task != nil {
done <- m.Task(ctx)
} else {
done <- s.runFunc(ctx, m)
var err error
for i := 0; i < (int(m.RetryCount) + 1); i++ {
if i != 0 {
time.Sleep(m.RetryDelay)
}
if m.Task != nil {
err = m.Task(ctx)
} else {
err = s.runFunc(ctx, m)
}
if err == nil {
break
}
}
done <- err
}()

select {
Expand Down
70 changes: 70 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,73 @@ func TestHandleAllJobBeforeShutdownConsumerInQueue(t *testing.T) {
q.Release()
assert.Len(t, messages, 2)
}

func TestRetryCountWithNewMessage(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

m := mocks.NewMockQueuedMessage(controller)
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()

messages := make(chan string, 10)
count := 1

w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
if count%3 != 0 {
count++
return errors.New("count not correct")
}
messages <- string(m.Bytes())
return nil
}),
)

q, err := NewQueue(
WithLogger(NewLogger()),
WithWorker(w),
WithWorkerCount(1),
)
assert.NoError(t, err)

assert.NoError(t, q.Queue(
m,
job.WithRetryCount(3),
job.WithRetryDelay(50*time.Millisecond),
))
assert.Len(t, messages, 0)
q.Start()
q.Release()
assert.Len(t, messages, 1)
}

func TestRetryCountWithNewTask(t *testing.T) {
messages := make(chan string, 10)
count := 1

w := NewConsumer()

q, err := NewQueue(
WithLogger(NewLogger()),
WithWorker(w),
WithWorkerCount(1),
)
assert.NoError(t, err)

assert.NoError(t, q.QueueTask(
func(ctx context.Context) error {
if count%3 != 0 {
count++
return errors.New("count not correct")
}
messages <- "foobar"
return nil
},
job.WithRetryCount(3),
job.WithRetryDelay(50*time.Millisecond),
))
assert.Len(t, messages, 0)
q.Start()
q.Release()
assert.Len(t, messages, 1)
}
7 changes: 5 additions & 2 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ type Message struct {

// Timeout is the duration the task can be processed by Handler.
// zero if not specified
// default is 60 time.Minute
Timeout time.Duration `json:"timeout"`

// Payload is the payload data of the task.
Payload []byte `json:"body"`

// RetryCount retry count if failure
// RetryCount set count of retry
// default is 10
RetryCount int64 `json:"retry_count"`

// RetryCount retry count if failure
// RetryDelay set delay between retry
// default is 100ms
RetryDelay time.Duration `json:"retry_delay"`
}

Expand Down
9 changes: 7 additions & 2 deletions job/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ func (f OptionFunc) apply(option *Options) {
f(option)
}

func NewOptions(opts ...Option) *Options {
o := &Options{
func newDefaultOptions() *Options {
return &Options{
retryCount: 0,
retryDelay: 100 * time.Millisecond,
timeout: 60 * time.Minute,
}
}

// NewOptions with custom parameter
func NewOptions(opts ...Option) *Options {
o := newDefaultOptions()

// Loop through each option
for _, opt := range opts {
Expand Down