Skip to content

feat(mock): support mock testing #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/golang-queue/queue/core"
)

var _ Worker = (*Consumer)(nil)
var _ core.Worker = (*Consumer)(nil)

var errMaxCapacity = errors.New("max capacity reached")

// Consumer for simple queue using buffer channel
type Consumer struct {
taskQueue chan QueuedMessage
runFunc func(context.Context, QueuedMessage) error
taskQueue chan core.QueuedMessage
runFunc func(context.Context, core.QueuedMessage) error
stop chan struct{}
logger Logger
stopOnce sync.Once
Expand Down Expand Up @@ -75,7 +77,7 @@ func (s *Consumer) handle(job Job) error {
}

// Run to execute new task
func (s *Consumer) Run(task QueuedMessage) error {
func (s *Consumer) Run(task core.QueuedMessage) error {
var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *Consumer) Shutdown() error {
}

// Queue send task to the buffer channel
func (s *Consumer) Queue(task QueuedMessage) error {
func (s *Consumer) Queue(task core.QueuedMessage) error {
if atomic.LoadInt32(&s.stopFlag) == 1 {
return ErrQueueShutdown
}
Expand All @@ -118,7 +120,7 @@ func (s *Consumer) Queue(task QueuedMessage) error {
}

// Request a new task from channel
func (s *Consumer) Request() (QueuedMessage, error) {
func (s *Consumer) Request() (core.QueuedMessage, error) {
clock := 0
loop:
for {
Expand All @@ -143,7 +145,7 @@ loop:
func NewConsumer(opts ...Option) *Consumer {
o := NewOptions(opts...)
w := &Consumer{
taskQueue: make(chan QueuedMessage, o.queueSize),
taskQueue: make(chan core.QueuedMessage, o.queueSize),
stop: make(chan struct{}),
logger: o.logger,
runFunc: o.fn,
Expand Down
24 changes: 13 additions & 11 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/golang-queue/queue/core"

"github.com/stretchr/testify/assert"
)

Expand All @@ -28,7 +30,7 @@ func TestCustomFuncAndWait(t *testing.T) {
message: "foo",
}
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestJobReachTimeout(t *testing.T) {
message: "foo",
}
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
}
w := NewConsumer(
WithLogger(NewEmptyLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -144,7 +146,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
func TestGoroutineLeak(t *testing.T) {
w := NewConsumer(
WithLogger(NewLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -187,7 +189,7 @@ func TestGoroutinePanic(t *testing.T) {
message: "foo",
}
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
panic("missing something")
}),
)
Expand All @@ -208,7 +210,7 @@ func TestHandleTimeout(t *testing.T) {
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
Expand All @@ -224,7 +226,7 @@ func TestHandleTimeout(t *testing.T) {
}

w = NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
Expand All @@ -248,7 +250,7 @@ func TestJobComplete(t *testing.T) {
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return errors.New("job completed")
}),
)
Expand All @@ -263,7 +265,7 @@ func TestJobComplete(t *testing.T) {
}

w = NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
Expand Down Expand Up @@ -324,7 +326,7 @@ func TestTaskJobComplete(t *testing.T) {
func TestIncreaseWorkerCount(t *testing.T) {
w := NewConsumer(
WithLogger(NewEmptyLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -354,7 +356,7 @@ func TestIncreaseWorkerCount(t *testing.T) {

func TestDecreaseWorkerCount(t *testing.T) {
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(100 * time.Millisecond)
return nil
}),
Expand Down
2 changes: 1 addition & 1 deletion worker.go → core/worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package core

// Worker interface
type Worker interface {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/goleak v1.1.12
)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -36,6 +38,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 3 additions & 1 deletion metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"testing"
"time"

"github.com/golang-queue/queue/core"

"github.com/stretchr/testify/assert"
)

func TestMetricData(t *testing.T) {
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
switch string(m.Bytes()) {
case "foo1":
panic("missing something")
Expand Down
48 changes: 48 additions & 0 deletions mocks/mock_message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions mocks/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package mocks

import _ "github.com/golang/mock/mockgen/model"

//go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker
//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage
Loading