From e814552ccaa42c91542bd6ac69cd5d59c3560a90 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 19:57:57 +0800 Subject: [PATCH 1/6] chore: update Signed-off-by: Bo-Yi.Wu --- consumer_test.go | 6 ++--- job.go | 40 ++++++++++++++++++++++++++++++++ job_option.go | 40 ++++++++++++++++++++++++++++++++ queue.go | 59 +++++++++++++++--------------------------------- queue_test.go | 4 ++-- 5 files changed, 103 insertions(+), 46 deletions(-) create mode 100644 job.go create mode 100644 job_option.go diff --git a/consumer_test.go b/consumer_test.go index b00371e..4aaa484 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -103,7 +103,7 @@ func TestJobReachTimeout(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, WithTimeout(30*time.Millisecond))) q.Start() time.Sleep(50 * time.Millisecond) q.Release() @@ -137,8 +137,8 @@ func TestCancelJobAfterShutdown(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) - assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond))) + assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond))) q.Start() time.Sleep(10 * time.Millisecond) assert.Equal(t, 2, int(q.metric.busyWorkers)) diff --git a/job.go b/job.go new file mode 100644 index 0000000..f959950 --- /dev/null +++ b/job.go @@ -0,0 +1,40 @@ +package queue + +import ( + "time" + + "github.com/goccy/go-json" +) + +// Job describes a task and its metadata. +type Job struct { + Task TaskFunc `json:"-"` + + // Timeout is the duration the task can be processed by Handler. + // zero if not specified + Timeout time.Duration `json:"timeout"` + + // Payload is the payload data of the task. + Payload []byte `json:"body"` + + // RetryCount retry count if failure + RetryCount int64 `json:"retry_count"` + + // RetryCount retry count if failure + RetryDelay time.Duration `json:"retry_delay"` +} + +// Bytes get string body +func (j *Job) Bytes() []byte { + if j.Task != nil { + return nil + } + return j.Payload +} + +// Encode for encoding the structure +func (j *Job) encode() []byte { + b, _ := json.Marshal(j) + + return b +} diff --git a/job_option.go b/job_option.go new file mode 100644 index 0000000..b18085d --- /dev/null +++ b/job_option.go @@ -0,0 +1,40 @@ +package queue + +import "time" + +type JobConfig struct { + retryCount int64 + retryDelay time.Duration + timeout time.Duration +} + +// An JobOption configures a mutex. +type JobOption interface { + apply(*JobConfig) +} + +// JobOptionFunc is a function that configures a job. +type JobOptionFunc func(*JobConfig) + +// Apply calls f(option) +func (f JobOptionFunc) apply(option *JobConfig) { + f(option) +} + +func WithRetryCount(count int64) JobOption { + return JobOptionFunc(func(q *JobConfig) { + q.retryCount = count + }) +} + +func WithRetryDelay(t time.Duration) JobOption { + return JobOptionFunc(func(q *JobConfig) { + q.retryDelay = t + }) +} + +func WithTimeout(t time.Duration) JobOption { + return JobOptionFunc(func(q *JobConfig) { + q.timeout = t + }) +} diff --git a/queue.go b/queue.go index 1097637..a961cac 100644 --- a/queue.go +++ b/queue.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -32,35 +31,8 @@ type ( timeout time.Duration stopFlag int32 } - - // Job describes a task and its metadata. - Job struct { - Task TaskFunc `json:"-"` - - // Timeout is the duration the task can be processed by Handler. - // zero if not specified - Timeout time.Duration `json:"timeout"` - - // Payload is the payload data of the task. - Payload []byte `json:"body"` - } ) -// Bytes get string body -func (j *Job) Bytes() []byte { - if j.Task != nil { - return nil - } - return j.Payload -} - -// Encode for encoding the structure -func (j *Job) Encode() []byte { - b, _ := json.Marshal(j) - - return b -} - // ErrMissingWorker missing define worker var ErrMissingWorker = errors.New("missing worker module") @@ -145,25 +117,30 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(job core.QueuedMessage) error { - return q.handleQueue(q.timeout, job) -} - -// QueueWithTimeout to queue all job with specified timeout. -func (q *Queue) QueueWithTimeout(timeout time.Duration, job core.QueuedMessage) error { - return q.handleQueue(timeout, job) -} - -func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error { +func (q *Queue) Queue(job core.QueuedMessage, opts ...JobOption) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } + o := &JobConfig{ + retryCount: 0, + retryDelay: 100 * time.Millisecond, + timeout: q.timeout, + } + + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.apply(o) + } + if err := q.worker.Queue(&Job{ Payload: (&Job{ - Timeout: timeout, - Payload: job.Bytes(), - }).Encode(), + RetryCount: o.retryCount, + RetryDelay: o.retryDelay, + Timeout: o.timeout, + Payload: job.Bytes(), + }).encode(), }); err != nil { return err } diff --git a/queue_test.go b/queue_test.go index 0bd9232..1433544 100644 --- a/queue_test.go +++ b/queue_test.go @@ -134,9 +134,9 @@ func TestCloseQueueAfterShutdown(t *testing.T) { }) assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) - err = q.QueueWithTimeout(10*time.Millisecond, mockMessage{ + err = q.Queue(mockMessage{ message: "foobar", - }) + }, WithTimeout(10*time.Millisecond)) assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) } From 66df276be3b08264ddeacbaf09cb424757bc11cb Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 20:01:43 +0800 Subject: [PATCH 2/6] chore: update Signed-off-by: Bo-Yi.Wu --- .github/workflows/go.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b24f773..66bbbe2 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -27,7 +27,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - go: [1.13, 1.14, 1.15, 1.16, 1.17, 1.18, 1.19] + go: [1.18, 1.19] include: - os: ubuntu-latest go-build: ~/.cache/go-build From aaa1241ba1ba84d2bc01cea83b50356ab9033e00 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 20:19:21 +0800 Subject: [PATCH 3/6] chore: move message to job folder Signed-off-by: Bo-Yi.Wu --- consumer.go | 15 ++++++------ consumer_test.go | 35 ++++++++++++++-------------- job.go => job/job.go | 20 +++++++++------- job/option.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ job_option.go | 40 -------------------------------- queue.go | 37 ++++++++++++----------------- queue_test.go | 3 ++- worker_task.go | 3 ++- 8 files changed, 112 insertions(+), 96 deletions(-) rename job.go => job/job.go (64%) create mode 100644 job/option.go delete mode 100644 job_option.go diff --git a/consumer.go b/consumer.go index 29a0f79..b8c3b26 100644 --- a/consumer.go +++ b/consumer.go @@ -9,6 +9,7 @@ import ( "github.com/goccy/go-json" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) var _ core.Worker = (*Consumer)(nil) @@ -26,12 +27,12 @@ type Consumer struct { stopFlag int32 } -func (s *Consumer) handle(job *Job) error { +func (s *Consumer) handle(m *job.Message) error { // create channel with buffer size 1 to avoid goroutine leak done := make(chan error, 1) panicChan := make(chan interface{}, 1) startTime := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), job.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), m.Timeout) defer func() { cancel() }() @@ -46,10 +47,10 @@ func (s *Consumer) handle(job *Job) error { }() // run custom process function - if job.Task != nil { - done <- job.Task(ctx) + if m.Task != nil { + done <- m.Task(ctx) } else { - done <- s.runFunc(ctx, job) + done <- s.runFunc(ctx, m) } }() @@ -62,7 +63,7 @@ func (s *Consumer) handle(job *Job) error { // cancel job cancel() - leftTime := job.Timeout - time.Since(startTime) + leftTime := m.Timeout - time.Since(startTime) // wait job select { case <-time.After(leftTime): @@ -79,7 +80,7 @@ func (s *Consumer) handle(job *Job) error { // Run to execute new task func (s *Consumer) Run(task core.QueuedMessage) error { - data := task.(*Job) + data := task.(*job.Message) if data.Task == nil { _ = json.Unmarshal(task.Bytes(), data) } diff --git a/consumer_test.go b/consumer_test.go index 4aaa484..a4e5626 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/golang-queue/queue/mocks" "github.com/golang/mock/gomock" @@ -103,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.Queue(m, WithTimeout(30*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond))) q.Start() time.Sleep(50 * time.Millisecond) q.Release() @@ -137,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond))) - assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) q.Start() time.Sleep(10 * time.Millisecond) assert.Equal(t, 2, int(q.metric.busyWorkers)) @@ -207,7 +208,7 @@ func TestGoroutinePanic(t *testing.T) { } func TestHandleTimeout(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -218,11 +219,11 @@ func TestHandleTimeout(t *testing.T) { }), ) - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - job = &Job{ + m = &job.Message{ Timeout: 150 * time.Millisecond, Payload: []byte("foo"), } @@ -236,7 +237,7 @@ func TestHandleTimeout(t *testing.T) { done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done @@ -245,7 +246,7 @@ func TestHandleTimeout(t *testing.T) { } func TestJobComplete(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -255,11 +256,11 @@ func TestJobComplete(t *testing.T) { }), ) - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = &Job{ + m = &job.Message{ Timeout: 250 * time.Millisecond, Payload: []byte("foo"), } @@ -273,7 +274,7 @@ func TestJobComplete(t *testing.T) { done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done @@ -282,7 +283,7 @@ func TestJobComplete(t *testing.T) { } func TestTaskJobComplete(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Task: func(ctx context.Context) error { return errors.New("job completed") @@ -290,11 +291,11 @@ func TestTaskJobComplete(t *testing.T) { } w := NewConsumer() - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = &Job{ + m = &job.Message{ Timeout: 250 * time.Millisecond, Task: func(ctx context.Context) error { return nil @@ -304,21 +305,21 @@ func TestTaskJobComplete(t *testing.T) { w = NewConsumer() done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done assert.NoError(t, err) // job timeout - job = &Job{ + m = &job.Message{ Timeout: 50 * time.Millisecond, Task: func(ctx context.Context) error { time.Sleep(60 * time.Millisecond) return nil }, } - assert.Equal(t, context.DeadlineExceeded, w.handle(job)) + assert.Equal(t, context.DeadlineExceeded, w.handle(m)) } func TestIncreaseWorkerCount(t *testing.T) { diff --git a/job.go b/job/job.go similarity index 64% rename from job.go rename to job/job.go index f959950..bdfd016 100644 --- a/job.go +++ b/job/job.go @@ -1,13 +1,17 @@ -package queue +package job import ( + "context" "time" "github.com/goccy/go-json" ) -// Job describes a task and its metadata. -type Job struct { +// TaskFunc is the task function +type TaskFunc func(context.Context) error + +// Message describes a task and its metadata. +type Message struct { Task TaskFunc `json:"-"` // Timeout is the duration the task can be processed by Handler. @@ -25,16 +29,16 @@ type Job struct { } // Bytes get string body -func (j *Job) Bytes() []byte { - if j.Task != nil { +func (m *Message) Bytes() []byte { + if m.Task != nil { return nil } - return j.Payload + return m.Payload } // Encode for encoding the structure -func (j *Job) encode() []byte { - b, _ := json.Marshal(j) +func (m *Message) Encode() []byte { + b, _ := json.Marshal(m) return b } diff --git a/job/option.go b/job/option.go new file mode 100644 index 0000000..305db68 --- /dev/null +++ b/job/option.go @@ -0,0 +1,55 @@ +package job + +import "time" + +type Config struct { + RetryCount int64 + RetryDelay time.Duration + Timeout time.Duration +} + +// An Option configures a mutex. +type Option interface { + Apply(*Config) +} + +// OptionFunc is a function that configures a job. +type OptionFunc func(*Config) + +// Apply calls f(option) +func (f OptionFunc) Apply(option *Config) { + f(option) +} + +func DefaultOptions(opts ...Option) *Config { + o := &Config{ + RetryCount: 0, + RetryDelay: 100 * time.Millisecond, + } + + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.Apply(o) + } + + return o +} + +func WithRetryCount(count int64) Option { + return OptionFunc(func(q *Config) { + q.RetryCount = count + }) +} + +func WithRetryDelay(t time.Duration) Option { + return OptionFunc(func(q *Config) { + q.RetryDelay = t + }) +} + +func WithTimeout(t time.Duration) Option { + return OptionFunc(func(q *Config) { + q.Timeout = t + }) +} diff --git a/job_option.go b/job_option.go deleted file mode 100644 index b18085d..0000000 --- a/job_option.go +++ /dev/null @@ -1,40 +0,0 @@ -package queue - -import "time" - -type JobConfig struct { - retryCount int64 - retryDelay time.Duration - timeout time.Duration -} - -// An JobOption configures a mutex. -type JobOption interface { - apply(*JobConfig) -} - -// JobOptionFunc is a function that configures a job. -type JobOptionFunc func(*JobConfig) - -// Apply calls f(option) -func (f JobOptionFunc) apply(option *JobConfig) { - f(option) -} - -func WithRetryCount(count int64) JobOption { - return JobOptionFunc(func(q *JobConfig) { - q.retryCount = count - }) -} - -func WithRetryDelay(t time.Duration) JobOption { - return JobOptionFunc(func(q *JobConfig) { - q.retryDelay = t - }) -} - -func WithTimeout(t time.Duration) JobOption { - return JobOptionFunc(func(q *JobConfig) { - q.timeout = t - }) -} diff --git a/queue.go b/queue.go index a961cac..012402c 100644 --- a/queue.go +++ b/queue.go @@ -1,21 +1,18 @@ package queue import ( - "context" "errors" "sync" "sync/atomic" "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) // ErrQueueShutdown the queue is released and closed. var ErrQueueShutdown = errors.New("queue has been closed and released") -// TaskFunc is the task function -type TaskFunc func(context.Context) error - type ( // A Queue is a message queue. Queue struct { @@ -117,30 +114,26 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(job core.QueuedMessage, opts ...JobOption) error { +func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } - o := &JobConfig{ - retryCount: 0, - retryDelay: 100 * time.Millisecond, - timeout: q.timeout, - } + o := job.DefaultOptions(job.WithTimeout(q.timeout)) // Loop through each option for _, opt := range opts { // Call the option giving the instantiated - opt.apply(o) + opt.Apply(o) } - if err := q.worker.Queue(&Job{ - Payload: (&Job{ - RetryCount: o.retryCount, - RetryDelay: o.retryDelay, - Timeout: o.timeout, - Payload: job.Bytes(), - }).encode(), + if err := q.worker.Queue(&job.Message{ + Payload: (&job.Message{ + RetryCount: o.RetryCount, + RetryDelay: o.RetryDelay, + Timeout: o.Timeout, + Payload: m.Bytes(), + }).Encode(), }); err != nil { return err } @@ -151,21 +144,21 @@ func (q *Queue) Queue(job core.QueuedMessage, opts ...JobOption) error { } // QueueTask to queue job task -func (q *Queue) QueueTask(task TaskFunc) error { +func (q *Queue) QueueTask(task job.TaskFunc) error { return q.handleQueueTask(q.timeout, task) } // QueueTaskWithTimeout to queue job task with timeout -func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error { +func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task job.TaskFunc) error { return q.handleQueueTask(timeout, task) } -func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { +func (q *Queue) handleQueueTask(timeout time.Duration, task job.TaskFunc) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } - if err := q.worker.Queue(&Job{ + if err := q.worker.Queue(&job.Message{ Timeout: timeout, Task: task, }); err != nil { diff --git a/queue_test.go b/queue_test.go index 1433544..be541bc 100644 --- a/queue_test.go +++ b/queue_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/golang-queue/queue/mocks" "github.com/golang/mock/gomock" @@ -136,7 +137,7 @@ func TestCloseQueueAfterShutdown(t *testing.T) { assert.Equal(t, ErrQueueShutdown, err) err = q.Queue(mockMessage{ message: "foobar", - }, WithTimeout(10*time.Millisecond)) + }, job.WithTimeout(10*time.Millisecond)) assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) } diff --git a/worker_task.go b/worker_task.go index 88ce88e..1fea86b 100644 --- a/worker_task.go +++ b/worker_task.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) var _ core.Worker = (*taskWorker)(nil) @@ -15,7 +16,7 @@ type taskWorker struct { } func (w *taskWorker) Run(task core.QueuedMessage) error { - if v, ok := task.(*Job); ok { + if v, ok := task.(*job.Message); ok { if v.Task != nil { _ = v.Task(context.Background()) } From 5deb3adefd226a657cf8f772686ae24b50206e9a Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 20:25:30 +0800 Subject: [PATCH 4/6] chore: update Signed-off-by: Bo-Yi.Wu --- queue.go | 25 +++++++++++++------------ queue_example_test.go | 5 +++-- worker_task_test.go | 5 +++-- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/queue.go b/queue.go index 012402c..0162956 100644 --- a/queue.go +++ b/queue.go @@ -144,23 +144,24 @@ func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { } // QueueTask to queue job task -func (q *Queue) QueueTask(task job.TaskFunc) error { - return q.handleQueueTask(q.timeout, task) -} - -// QueueTaskWithTimeout to queue job task with timeout -func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task job.TaskFunc) error { - return q.handleQueueTask(timeout, task) -} - -func (q *Queue) handleQueueTask(timeout time.Duration, task job.TaskFunc) error { +func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.Option) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } + o := job.DefaultOptions(job.WithTimeout(q.timeout)) + + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.Apply(o) + } + if err := q.worker.Queue(&job.Message{ - Timeout: timeout, - Task: task, + Timeout: o.Timeout, + RetryCount: o.RetryCount, + RetryDelay: o.RetryDelay, + Task: task, }); err != nil { return err } diff --git a/queue_example_test.go b/queue_example_test.go index 850af5c..1383c1b 100644 --- a/queue_example_test.go +++ b/queue_example_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/golang-queue/queue" + "github.com/golang-queue/queue/job" ) func ExampleNewPool_queueTask() { @@ -57,7 +58,7 @@ func ExampleNewPool_queueTaskTimeout() { // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i - if err := q.QueueTaskWithTimeout(100*time.Millisecond, func(ctx context.Context) error { + if err := q.QueueTask(func(ctx context.Context) error { // panic job if idx == 5 { panic("system error") @@ -74,7 +75,7 @@ func ExampleNewPool_queueTaskTimeout() { rets <- idx return nil - }); err != nil { + }, job.WithTimeout(100*time.Millisecond)); err != nil { log.Println(err) } } diff --git a/worker_task_test.go b/worker_task_test.go index 62280e5..1d8c2f5 100644 --- a/worker_task_test.go +++ b/worker_task_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/stretchr/testify/assert" ) @@ -32,10 +33,10 @@ func TestQueueTaskJob(t *testing.T) { q.logger.Info("Add new task 2") return nil })) - assert.NoError(t, q.QueueTaskWithTimeout(50*time.Millisecond, func(ctx context.Context) error { + assert.NoError(t, q.QueueTask(func(ctx context.Context) error { time.Sleep(80 * time.Millisecond) return nil - })) + }, job.WithTimeout(50*time.Millisecond))) time.Sleep(50 * time.Millisecond) q.Shutdown() assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error { From 5a0c1061b74ba81ecb13e206487bd73a873e4737 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 21:20:58 +0800 Subject: [PATCH 5/6] chore: update Signed-off-by: Bo-Yi.Wu --- job/job.go | 33 +++++++++++++++++++++++++++++++++ job/option.go | 35 ++++++++++++++++++----------------- options.go | 11 ----------- queue.go | 32 ++++---------------------------- 4 files changed, 55 insertions(+), 56 deletions(-) diff --git a/job/job.go b/job/job.go index bdfd016..3d71834 100644 --- a/job/job.go +++ b/job/job.go @@ -5,6 +5,7 @@ import ( "time" "github.com/goccy/go-json" + "github.com/golang-queue/queue/core" ) // TaskFunc is the task function @@ -42,3 +43,35 @@ func (m *Message) Encode() []byte { return b } + +func NewMessage(m core.QueuedMessage, opts ...Option) *Message { + o := NewOptions(opts...) + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.Apply(o) + } + + return &Message{ + RetryCount: o.retryCount, + RetryDelay: o.retryDelay, + Timeout: o.timeout, + Payload: m.Bytes(), + } +} + +func NewTask(task TaskFunc, opts ...Option) *Message { + o := NewOptions(opts...) + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.Apply(o) + } + + return &Message{ + Timeout: o.timeout, + RetryCount: o.retryCount, + RetryDelay: o.retryDelay, + Task: task, + } +} diff --git a/job/option.go b/job/option.go index 305db68..d1b1155 100644 --- a/job/option.go +++ b/job/option.go @@ -2,29 +2,30 @@ package job import "time" -type Config struct { - RetryCount int64 - RetryDelay time.Duration - Timeout time.Duration +type Options struct { + retryCount int64 + retryDelay time.Duration + timeout time.Duration } // An Option configures a mutex. type Option interface { - Apply(*Config) + Apply(*Options) } // OptionFunc is a function that configures a job. -type OptionFunc func(*Config) +type OptionFunc func(*Options) // Apply calls f(option) -func (f OptionFunc) Apply(option *Config) { +func (f OptionFunc) Apply(option *Options) { f(option) } -func DefaultOptions(opts ...Option) *Config { - o := &Config{ - RetryCount: 0, - RetryDelay: 100 * time.Millisecond, +func NewOptions(opts ...Option) *Options { + o := &Options{ + retryCount: 0, + retryDelay: 100 * time.Millisecond, + timeout: 60 * time.Minute, } // Loop through each option @@ -37,19 +38,19 @@ func DefaultOptions(opts ...Option) *Config { } func WithRetryCount(count int64) Option { - return OptionFunc(func(q *Config) { - q.RetryCount = count + return OptionFunc(func(o *Options) { + o.retryCount = count }) } func WithRetryDelay(t time.Duration) Option { - return OptionFunc(func(q *Config) { - q.RetryDelay = t + return OptionFunc(func(o *Options) { + o.retryDelay = t }) } func WithTimeout(t time.Duration) Option { - return OptionFunc(func(q *Config) { - q.Timeout = t + return OptionFunc(func(o *Options) { + o.timeout = t }) } diff --git a/options.go b/options.go index 873bdcc..ba0b51a 100644 --- a/options.go +++ b/options.go @@ -3,7 +3,6 @@ package queue import ( "context" "runtime" - "time" "github.com/golang-queue/queue/core" ) @@ -11,7 +10,6 @@ import ( var ( defaultQueueSize = 4096 defaultWorkerCount = runtime.NumCPU() - defaultTimeout = 60 * time.Minute defaultNewLogger = NewLogger() defaultFn = func(context.Context, core.QueuedMessage) error { return nil } defaultMetric = NewMetric() @@ -72,17 +70,9 @@ func WithFn(fn func(context.Context, core.QueuedMessage) error) Option { }) } -// WithTimeOut set custom timeout -func WithTimeOut(t time.Duration) Option { - return OptionFunc(func(q *Options) { - q.timeout = t - }) -} - // Options for custom args in Queue type Options struct { workerCount int - timeout time.Duration logger Logger queueSize int worker core.Worker @@ -95,7 +85,6 @@ func NewOptions(opts ...Option) *Options { o := &Options{ workerCount: defaultWorkerCount, queueSize: defaultQueueSize, - timeout: defaultTimeout, logger: defaultNewLogger, worker: nil, fn: defaultFn, diff --git a/queue.go b/queue.go index 0162956..46a5d12 100644 --- a/queue.go +++ b/queue.go @@ -25,7 +25,6 @@ type ( ready chan struct{} worker core.Worker stopOnce sync.Once - timeout time.Duration stopFlag int32 } ) @@ -42,7 +41,6 @@ func NewQueue(opts ...Option) (*Queue, error) { ready: make(chan struct{}, 1), workerCount: o.workerCount, logger: o.logger, - timeout: o.timeout, worker: o.worker, metric: &metric{}, } @@ -119,21 +117,10 @@ func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { return ErrQueueShutdown } - o := job.DefaultOptions(job.WithTimeout(q.timeout)) - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.Apply(o) - } + message := job.NewMessage(m, opts...) if err := q.worker.Queue(&job.Message{ - Payload: (&job.Message{ - RetryCount: o.RetryCount, - RetryDelay: o.RetryDelay, - Timeout: o.Timeout, - Payload: m.Bytes(), - }).Encode(), + Payload: message.Encode(), }); err != nil { return err } @@ -149,20 +136,9 @@ func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.Option) error { return ErrQueueShutdown } - o := job.DefaultOptions(job.WithTimeout(q.timeout)) - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.Apply(o) - } + message := job.NewTask(task, opts...) - if err := q.worker.Queue(&job.Message{ - Timeout: o.Timeout, - RetryCount: o.RetryCount, - RetryDelay: o.RetryDelay, - Task: task, - }); err != nil { + if err := q.worker.Queue(message); err != nil { return err } From cfbdc458f74e78d9a15a05a45eae47caa61dc2b6 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 21:23:06 +0800 Subject: [PATCH 6/6] chore: update Signed-off-by: Bo-Yi.Wu --- job/job.go | 10 ---------- job/option.go | 8 ++++---- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/job/job.go b/job/job.go index 3d71834..c0b3dee 100644 --- a/job/job.go +++ b/job/job.go @@ -46,11 +46,6 @@ func (m *Message) Encode() []byte { func NewMessage(m core.QueuedMessage, opts ...Option) *Message { o := NewOptions(opts...) - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.Apply(o) - } return &Message{ RetryCount: o.retryCount, @@ -62,11 +57,6 @@ func NewMessage(m core.QueuedMessage, opts ...Option) *Message { func NewTask(task TaskFunc, opts ...Option) *Message { o := NewOptions(opts...) - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.Apply(o) - } return &Message{ Timeout: o.timeout, diff --git a/job/option.go b/job/option.go index d1b1155..175c660 100644 --- a/job/option.go +++ b/job/option.go @@ -10,14 +10,14 @@ type Options struct { // An Option configures a mutex. type Option interface { - Apply(*Options) + apply(*Options) } // OptionFunc is a function that configures a job. type OptionFunc func(*Options) -// Apply calls f(option) -func (f OptionFunc) Apply(option *Options) { +// apply calls f(option) +func (f OptionFunc) apply(option *Options) { f(option) } @@ -31,7 +31,7 @@ func NewOptions(opts ...Option) *Options { // Loop through each option for _, opt := range opts { // Call the option giving the instantiated - opt.Apply(o) + opt.apply(o) } return o