Skip to content

test(benchmark): add queue performance check #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
- name: Run Tests
run: |
go test -v -covermode=atomic -coverprofile=coverage.out
go test -v -run=^$ -benchmem -bench .

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
Expand Down
14 changes: 6 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand All @@ -26,7 +26,7 @@ type Consumer struct {
stopFlag int32
}

func (s *Consumer) handle(job Job) error {
func (s *Consumer) handle(job *Job) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
Expand Down Expand Up @@ -79,13 +79,11 @@ func (s *Consumer) handle(job Job) error {

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
if v.Task != nil {
data.Task = v.Task
}
data := task.(*Job)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

if err := s.handle(data); err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestGoroutinePanic(t *testing.T) {
}

func TestHandleTimeout(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -222,7 +222,7 @@ func TestHandleTimeout(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

job = Job{
job = &Job{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -245,7 +245,7 @@ func TestHandleTimeout(t *testing.T) {
}

func TestJobComplete(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -259,7 +259,7 @@ func TestJobComplete(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = Job{
job = &Job{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -282,7 +282,7 @@ func TestJobComplete(t *testing.T) {
}

func TestTaskJobComplete(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
Expand All @@ -294,7 +294,7 @@ func TestTaskJobComplete(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = Job{
job = &Job{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
Expand All @@ -311,7 +311,7 @@ func TestTaskJobComplete(t *testing.T) {
assert.NoError(t, err)

// job timeout
job = Job{
job = &Job{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/goccy/go-json v0.9.7
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/goleak v1.1.12
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
36 changes: 17 additions & 19 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand Down Expand Up @@ -47,13 +47,17 @@ type (
)

// Bytes get string body
func (j Job) Bytes() []byte {
func (j *Job) Bytes() []byte {
if j.Task != nil {
return nil
}
return j.Payload
}

// Encode for encoding the structure
func (j Job) Encode() []byte {
func (j *Job) Encode() []byte {
b, _ := json.Marshal(j)

return b
}

Expand Down Expand Up @@ -97,11 +101,11 @@ func (q *Queue) Shutdown() {
return
}

if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

q.stopOnce.Do(func() {
if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

if err := q.worker.Shutdown(); err != nil {
q.logger.Error(err)
}
Expand Down Expand Up @@ -155,13 +159,11 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error
return ErrQueueShutdown
}

data := Job{
Timeout: timeout,
Payload: job.Bytes(),
}

if err := q.worker.Queue(Job{
Payload: data.Encode(),
if err := q.worker.Queue(&Job{
Payload: (&Job{
Timeout: timeout,
Payload: job.Bytes(),
}).Encode(),
}); err != nil {
return err
}
Expand All @@ -186,13 +188,9 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
return ErrQueueShutdown
}

data := Job{
if err := q.worker.Queue(&Job{
Timeout: timeout,
}

if err := q.worker.Queue(Job{
Task: task,
Payload: data.Encode(),
}); err != nil {
return err
}
Expand Down
24 changes: 24 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queue

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -140,3 +141,26 @@ func TestCloseQueueAfterShutdown(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, ErrQueueShutdown, err)
}

func BenchmarkQueueTask(b *testing.B) {
b.ReportAllocs()
q := NewPool(5)
defer q.Release()
for n := 0; n < b.N; n++ {
_ = q.QueueTask(func(context.Context) error {
return nil
})
}
}

func BenchmarkQueue(b *testing.B) {
b.ReportAllocs()
m := &mockMessage{
message: "foo",
}
q := NewPool(5)
defer q.Release()
for n := 0; n < b.N; n++ {
_ = q.Queue(m)
}
}
2 changes: 1 addition & 1 deletion worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type taskWorker struct {
}

func (w *taskWorker) Run(task core.QueuedMessage) error {
if v, ok := task.(Job); ok {
if v, ok := task.(*Job); ok {
if v.Task != nil {
_ = v.Task(context.Background())
}
Expand Down