From 49894bf9a85a767e0564f8f8cc1ce75bb2bf60f0 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 29 May 2022 12:12:15 +0800 Subject: [PATCH 1/4] test(benchmark): add queue performance check --- .github/workflows/go.yml | 1 + queue_test.go | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 9c1169b..aecad16 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -58,6 +58,7 @@ jobs: - name: Run Tests run: | go test -v -covermode=atomic -coverprofile=coverage.out + go test -v -run=^$ -benchmem -bench . - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/queue_test.go b/queue_test.go index 0bd9232..f0edc14 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,7 @@ package queue import ( + "context" "testing" "time" @@ -140,3 +141,25 @@ func TestCloseQueueAfterShutdown(t *testing.T) { assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) } + +func BenchmarkQueueTask(b *testing.B) { + q := NewPool(5) + q.Release() + for n := 0; n < b.N; n++ { + _ = q.QueueTask(func(context.Context) error { + time.Sleep(10 * time.Millisecond) + return nil + }) + } +} + +func BenchmarkQueue(b *testing.B) { + m := &mockMessage{ + message: "foo", + } + q := NewPool(5) + q.Release() + for n := 0; n < b.N; n++ { + _ = q.Queue(m) + } +} From cf6a656fbce967d12ff29aed2a0e18b60ef91e5d Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 29 May 2022 12:17:54 +0800 Subject: [PATCH 2/4] fix: defer Signed-off-by: Bo-Yi Wu --- queue_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_test.go b/queue_test.go index f0edc14..a3cdc81 100644 --- a/queue_test.go +++ b/queue_test.go @@ -144,7 +144,7 @@ func TestCloseQueueAfterShutdown(t *testing.T) { func BenchmarkQueueTask(b *testing.B) { q := NewPool(5) - q.Release() + defer q.Release() for n := 0; n < b.N; n++ { _ = q.QueueTask(func(context.Context) error { time.Sleep(10 * time.Millisecond) @@ -158,7 +158,7 @@ func BenchmarkQueue(b *testing.B) { message: "foo", } q := NewPool(5) - q.Release() + defer q.Release() for n := 0; n < b.N; n++ { _ = q.Queue(m) } From 5d2819ea5e4b77e8dd7d8c915e9d2ab4c8370deb Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 29 May 2022 14:46:57 +0800 Subject: [PATCH 3/4] chore: add pointer Signed-off-by: Bo-Yi Wu --- consumer.go | 19 ++++++++++++------- consumer_test.go | 14 +++++++------- queue.go | 15 ++++++++------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/consumer.go b/consumer.go index 56ec765..df4e40f 100644 --- a/consumer.go +++ b/consumer.go @@ -26,7 +26,7 @@ type Consumer struct { stopFlag int32 } -func (s *Consumer) handle(job Job) error { +func (s *Consumer) handle(job *Job) error { // create channel with buffer size 1 to avoid goroutine leak done := make(chan error, 1) panicChan := make(chan interface{}, 1) @@ -79,13 +79,18 @@ func (s *Consumer) handle(job Job) error { // Run to execute new task func (s *Consumer) Run(task core.QueuedMessage) error { - var data Job - _ = json.Unmarshal(task.Bytes(), &data) - if v, ok := task.(Job); ok { - if v.Task != nil { - data.Task = v.Task - } + // var data Job + // _ = json.Unmarshal(task.Bytes(), &data) + // if v, ok := task.(Job); ok { + // if v.Task != nil { + // data.Task = v.Task + // } + // } + data := task.(*Job) + if data.Task == nil { + _ = json.Unmarshal(task.Bytes(), data) } + if err := s.handle(data); err != nil { return err } diff --git a/consumer_test.go b/consumer_test.go index 7ea1fac..52d2f6c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -207,7 +207,7 @@ func TestGoroutinePanic(t *testing.T) { } func TestHandleTimeout(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -222,7 +222,7 @@ func TestHandleTimeout(t *testing.T) { assert.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - job = Job{ + job = &Job{ Timeout: 150 * time.Millisecond, Payload: []byte("foo"), } @@ -245,7 +245,7 @@ func TestHandleTimeout(t *testing.T) { } func TestJobComplete(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -259,7 +259,7 @@ func TestJobComplete(t *testing.T) { assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = Job{ + job = &Job{ Timeout: 250 * time.Millisecond, Payload: []byte("foo"), } @@ -282,7 +282,7 @@ func TestJobComplete(t *testing.T) { } func TestTaskJobComplete(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Task: func(ctx context.Context) error { return errors.New("job completed") @@ -294,7 +294,7 @@ func TestTaskJobComplete(t *testing.T) { assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = Job{ + job = &Job{ Timeout: 250 * time.Millisecond, Task: func(ctx context.Context) error { return nil @@ -311,7 +311,7 @@ func TestTaskJobComplete(t *testing.T) { assert.NoError(t, err) // job timeout - job = Job{ + job = &Job{ Timeout: 50 * time.Millisecond, Task: func(ctx context.Context) error { time.Sleep(60 * time.Millisecond) diff --git a/queue.go b/queue.go index 7c17d1b..023c7fd 100644 --- a/queue.go +++ b/queue.go @@ -48,6 +48,9 @@ type ( // Bytes get string body func (j Job) Bytes() []byte { + if j.Task != nil { + return nil + } return j.Payload } @@ -155,12 +158,12 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error return ErrQueueShutdown } - data := Job{ + data := &Job{ Timeout: timeout, Payload: job.Bytes(), } - if err := q.worker.Queue(Job{ + if err := q.worker.Queue(&Job{ Payload: data.Encode(), }); err != nil { return err @@ -186,14 +189,12 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { return ErrQueueShutdown } - data := Job{ + data := &Job{ Timeout: timeout, + Task: task, } - if err := q.worker.Queue(Job{ - Task: task, - Payload: data.Encode(), - }); err != nil { + if err := q.worker.Queue(data); err != nil { return err } From 15c43eca41b777539a59eff02f47a1274c6d02b3 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 29 May 2022 22:52:53 +0800 Subject: [PATCH 4/4] chore: update json library Signed-off-by: Bo-Yi Wu --- consumer.go | 9 +-------- go.mod | 3 ++- go.sum | 5 ++++- queue.go | 31 ++++++++++++++----------------- queue_test.go | 3 ++- worker_task.go | 2 +- 6 files changed, 24 insertions(+), 29 deletions(-) diff --git a/consumer.go b/consumer.go index df4e40f..29a0f79 100644 --- a/consumer.go +++ b/consumer.go @@ -2,12 +2,12 @@ package queue import ( "context" - "encoding/json" "errors" "sync" "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -79,13 +79,6 @@ func (s *Consumer) handle(job *Job) error { // Run to execute new task func (s *Consumer) Run(task core.QueuedMessage) error { - // var data Job - // _ = json.Unmarshal(task.Bytes(), &data) - // if v, ok := task.(Job); ok { - // if v.Task != nil { - // data.Task = v.Task - // } - // } data := task.(*Job) if data.Task == nil { _ = json.Unmarshal(task.Bytes(), data) diff --git a/go.mod b/go.mod index 94dfa70..41c0266 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/golang-queue/queue go 1.18 require ( + github.com/goccy/go-json v0.9.7 github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.7.1 go.uber.org/goleak v1.1.12 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 9320fa5..9cf4631 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/queue.go b/queue.go index 023c7fd..1097637 100644 --- a/queue.go +++ b/queue.go @@ -2,12 +2,12 @@ package queue import ( "context" - "encoding/json" "errors" "sync" "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -47,7 +47,7 @@ type ( ) // Bytes get string body -func (j Job) Bytes() []byte { +func (j *Job) Bytes() []byte { if j.Task != nil { return nil } @@ -55,8 +55,9 @@ func (j Job) Bytes() []byte { } // Encode for encoding the structure -func (j Job) Encode() []byte { +func (j *Job) Encode() []byte { b, _ := json.Marshal(j) + return b } @@ -100,11 +101,11 @@ func (q *Queue) Shutdown() { return } - if q.metric.BusyWorkers() > 0 { - q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers()) - } - q.stopOnce.Do(func() { + if q.metric.BusyWorkers() > 0 { + q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers()) + } + if err := q.worker.Shutdown(); err != nil { q.logger.Error(err) } @@ -158,13 +159,11 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error return ErrQueueShutdown } - data := &Job{ - Timeout: timeout, - Payload: job.Bytes(), - } - if err := q.worker.Queue(&Job{ - Payload: data.Encode(), + Payload: (&Job{ + Timeout: timeout, + Payload: job.Bytes(), + }).Encode(), }); err != nil { return err } @@ -189,12 +188,10 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { return ErrQueueShutdown } - data := &Job{ + if err := q.worker.Queue(&Job{ Timeout: timeout, Task: task, - } - - if err := q.worker.Queue(data); err != nil { + }); err != nil { return err } diff --git a/queue_test.go b/queue_test.go index a3cdc81..cb7581f 100644 --- a/queue_test.go +++ b/queue_test.go @@ -143,17 +143,18 @@ func TestCloseQueueAfterShutdown(t *testing.T) { } func BenchmarkQueueTask(b *testing.B) { + b.ReportAllocs() q := NewPool(5) defer q.Release() for n := 0; n < b.N; n++ { _ = q.QueueTask(func(context.Context) error { - time.Sleep(10 * time.Millisecond) return nil }) } } func BenchmarkQueue(b *testing.B) { + b.ReportAllocs() m := &mockMessage{ message: "foo", } diff --git a/worker_task.go b/worker_task.go index 0edf79e..88ce88e 100644 --- a/worker_task.go +++ b/worker_task.go @@ -15,7 +15,7 @@ type taskWorker struct { } func (w *taskWorker) Run(task core.QueuedMessage) error { - if v, ok := task.(Job); ok { + if v, ok := task.(*Job); ok { if v.Task != nil { _ = v.Task(context.Background()) }