diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 9c1169b..aecad16 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -58,6 +58,7 @@ jobs: - name: Run Tests run: | go test -v -covermode=atomic -coverprofile=coverage.out + go test -v -run=^$ -benchmem -bench . - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/consumer.go b/consumer.go index 56ec765..29a0f79 100644 --- a/consumer.go +++ b/consumer.go @@ -2,12 +2,12 @@ package queue import ( "context" - "encoding/json" "errors" "sync" "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -26,7 +26,7 @@ type Consumer struct { stopFlag int32 } -func (s *Consumer) handle(job Job) error { +func (s *Consumer) handle(job *Job) error { // create channel with buffer size 1 to avoid goroutine leak done := make(chan error, 1) panicChan := make(chan interface{}, 1) @@ -79,13 +79,11 @@ func (s *Consumer) handle(job Job) error { // Run to execute new task func (s *Consumer) Run(task core.QueuedMessage) error { - var data Job - _ = json.Unmarshal(task.Bytes(), &data) - if v, ok := task.(Job); ok { - if v.Task != nil { - data.Task = v.Task - } + data := task.(*Job) + if data.Task == nil { + _ = json.Unmarshal(task.Bytes(), data) } + if err := s.handle(data); err != nil { return err } diff --git a/consumer_test.go b/consumer_test.go index 7ea1fac..52d2f6c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -207,7 +207,7 @@ func TestGoroutinePanic(t *testing.T) { } func TestHandleTimeout(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -222,7 +222,7 @@ func TestHandleTimeout(t *testing.T) { assert.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - job = Job{ + job = &Job{ Timeout: 150 * time.Millisecond, Payload: []byte("foo"), } @@ -245,7 +245,7 @@ func TestHandleTimeout(t *testing.T) { } func TestJobComplete(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -259,7 +259,7 @@ func TestJobComplete(t *testing.T) { assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = Job{ + job = &Job{ Timeout: 250 * time.Millisecond, Payload: []byte("foo"), } @@ -282,7 +282,7 @@ func TestJobComplete(t *testing.T) { } func TestTaskJobComplete(t *testing.T) { - job := Job{ + job := &Job{ Timeout: 100 * time.Millisecond, Task: func(ctx context.Context) error { return errors.New("job completed") @@ -294,7 +294,7 @@ func TestTaskJobComplete(t *testing.T) { assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = Job{ + job = &Job{ Timeout: 250 * time.Millisecond, Task: func(ctx context.Context) error { return nil @@ -311,7 +311,7 @@ func TestTaskJobComplete(t *testing.T) { assert.NoError(t, err) // job timeout - job = Job{ + job = &Job{ Timeout: 50 * time.Millisecond, Task: func(ctx context.Context) error { time.Sleep(60 * time.Millisecond) diff --git a/go.mod b/go.mod index 94dfa70..41c0266 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/golang-queue/queue go 1.18 require ( + github.com/goccy/go-json v0.9.7 github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.7.1 go.uber.org/goleak v1.1.12 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 9320fa5..9cf4631 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/queue.go b/queue.go index 7c17d1b..1097637 100644 --- a/queue.go +++ b/queue.go @@ -2,12 +2,12 @@ package queue import ( "context" - "encoding/json" "errors" "sync" "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -47,13 +47,17 @@ type ( ) // Bytes get string body -func (j Job) Bytes() []byte { +func (j *Job) Bytes() []byte { + if j.Task != nil { + return nil + } return j.Payload } // Encode for encoding the structure -func (j Job) Encode() []byte { +func (j *Job) Encode() []byte { b, _ := json.Marshal(j) + return b } @@ -97,11 +101,11 @@ func (q *Queue) Shutdown() { return } - if q.metric.BusyWorkers() > 0 { - q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers()) - } - q.stopOnce.Do(func() { + if q.metric.BusyWorkers() > 0 { + q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers()) + } + if err := q.worker.Shutdown(); err != nil { q.logger.Error(err) } @@ -155,13 +159,11 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error return ErrQueueShutdown } - data := Job{ - Timeout: timeout, - Payload: job.Bytes(), - } - - if err := q.worker.Queue(Job{ - Payload: data.Encode(), + if err := q.worker.Queue(&Job{ + Payload: (&Job{ + Timeout: timeout, + Payload: job.Bytes(), + }).Encode(), }); err != nil { return err } @@ -186,13 +188,9 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { return ErrQueueShutdown } - data := Job{ + if err := q.worker.Queue(&Job{ Timeout: timeout, - } - - if err := q.worker.Queue(Job{ Task: task, - Payload: data.Encode(), }); err != nil { return err } diff --git a/queue_test.go b/queue_test.go index 0bd9232..cb7581f 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,7 @@ package queue import ( + "context" "testing" "time" @@ -140,3 +141,26 @@ func TestCloseQueueAfterShutdown(t *testing.T) { assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) } + +func BenchmarkQueueTask(b *testing.B) { + b.ReportAllocs() + q := NewPool(5) + defer q.Release() + for n := 0; n < b.N; n++ { + _ = q.QueueTask(func(context.Context) error { + return nil + }) + } +} + +func BenchmarkQueue(b *testing.B) { + b.ReportAllocs() + m := &mockMessage{ + message: "foo", + } + q := NewPool(5) + defer q.Release() + for n := 0; n < b.N; n++ { + _ = q.Queue(m) + } +} diff --git a/worker_task.go b/worker_task.go index 0edf79e..88ce88e 100644 --- a/worker_task.go +++ b/worker_task.go @@ -15,7 +15,7 @@ type taskWorker struct { } func (w *taskWorker) Run(task core.QueuedMessage) error { - if v, ok := task.(Job); ok { + if v, ok := task.(*Job); ok { if v.Task != nil { _ = v.Task(context.Background()) }