From 73c8a83e4e0490a9f58588eb806b2e250fb1e285 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 30 Oct 2022 17:39:25 +0800 Subject: [PATCH] chore(job): support retry count and delay between retry Signed-off-by: Bo-Yi.Wu --- consumer.go | 18 ++++++++++--- consumer_test.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ job/job.go | 7 +++-- job/option.go | 9 +++++-- 4 files changed, 96 insertions(+), 8 deletions(-) diff --git a/consumer.go b/consumer.go index b8c3b26..d266103 100644 --- a/consumer.go +++ b/consumer.go @@ -47,11 +47,21 @@ func (s *Consumer) handle(m *job.Message) error { }() // run custom process function - if m.Task != nil { - done <- m.Task(ctx) - } else { - done <- s.runFunc(ctx, m) + var err error + for i := 0; i < (int(m.RetryCount) + 1); i++ { + if i != 0 { + time.Sleep(m.RetryDelay) + } + if m.Task != nil { + err = m.Task(ctx) + } else { + err = s.runFunc(ctx, m) + } + if err == nil { + break + } } + done <- err }() select { diff --git a/consumer_test.go b/consumer_test.go index a4e5626..e8c37b4 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -449,3 +449,73 @@ func TestHandleAllJobBeforeShutdownConsumerInQueue(t *testing.T) { q.Release() assert.Len(t, messages, 2) } + +func TestRetryCountWithNewMessage(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + m := mocks.NewMockQueuedMessage(controller) + m.EXPECT().Bytes().Return([]byte("test")).AnyTimes() + + messages := make(chan string, 10) + count := 1 + + w := NewConsumer( + WithFn(func(ctx context.Context, m core.QueuedMessage) error { + if count%3 != 0 { + count++ + return errors.New("count not correct") + } + messages <- string(m.Bytes()) + return nil + }), + ) + + q, err := NewQueue( + WithLogger(NewLogger()), + WithWorker(w), + WithWorkerCount(1), + ) + assert.NoError(t, err) + + assert.NoError(t, q.Queue( + m, + job.WithRetryCount(3), + job.WithRetryDelay(50*time.Millisecond), + )) + assert.Len(t, messages, 0) + q.Start() + q.Release() + assert.Len(t, messages, 1) +} + +func TestRetryCountWithNewTask(t *testing.T) { + messages := make(chan string, 10) + count := 1 + + w := NewConsumer() + + q, err := NewQueue( + WithLogger(NewLogger()), + WithWorker(w), + WithWorkerCount(1), + ) + assert.NoError(t, err) + + assert.NoError(t, q.QueueTask( + func(ctx context.Context) error { + if count%3 != 0 { + count++ + return errors.New("count not correct") + } + messages <- "foobar" + return nil + }, + job.WithRetryCount(3), + job.WithRetryDelay(50*time.Millisecond), + )) + assert.Len(t, messages, 0) + q.Start() + q.Release() + assert.Len(t, messages, 1) +} diff --git a/job/job.go b/job/job.go index c0b3dee..ea16c33 100644 --- a/job/job.go +++ b/job/job.go @@ -17,15 +17,18 @@ type Message struct { // Timeout is the duration the task can be processed by Handler. // zero if not specified + // default is 60 time.Minute Timeout time.Duration `json:"timeout"` // Payload is the payload data of the task. Payload []byte `json:"body"` - // RetryCount retry count if failure + // RetryCount set count of retry + // default is 10 RetryCount int64 `json:"retry_count"` - // RetryCount retry count if failure + // RetryDelay set delay between retry + // default is 100ms RetryDelay time.Duration `json:"retry_delay"` } diff --git a/job/option.go b/job/option.go index 175c660..a0be4a9 100644 --- a/job/option.go +++ b/job/option.go @@ -21,12 +21,17 @@ func (f OptionFunc) apply(option *Options) { f(option) } -func NewOptions(opts ...Option) *Options { - o := &Options{ +func newDefaultOptions() *Options { + return &Options{ retryCount: 0, retryDelay: 100 * time.Millisecond, timeout: 60 * time.Minute, } +} + +// NewOptions with custom parameter +func NewOptions(opts ...Option) *Options { + o := newDefaultOptions() // Loop through each option for _, opt := range opts {