Skip to content

chore(consumer): support request timeout flag #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Consumer struct {
logger Logger
stopOnce sync.Once
stopFlag int32

requestTimeout time.Duration
}

func (s *Consumer) handle(m *job.Message) error {
Expand Down Expand Up @@ -139,28 +141,19 @@ func (s *Consumer) Queue(task core.QueuedMessage) error {

// Request a new task from channel
func (s *Consumer) Request() (core.QueuedMessage, error) {
clock := 0
loop:
for {
select {
case task, ok := <-s.taskQueue:
if !ok {
select {
case s.exit <- struct{}{}:
default:
}
return nil, ErrQueueHasBeenClosed
}
return task, nil
case <-time.After(1 * time.Second):
if clock == 5 {
break loop
select {
case task, ok := <-s.taskQueue:
if !ok {
select {
case s.exit <- struct{}{}:
default:
}
clock += 1
return nil, ErrQueueHasBeenClosed
}
return task, nil
case <-time.After(s.requestTimeout):
return nil, ErrNoTaskInQueue
}

return nil, ErrNoTaskInQueue
}

// NewConsumer for create new consumer instance
Expand All @@ -172,6 +165,8 @@ func NewConsumer(opts ...Option) *Consumer {
exit: make(chan struct{}),
logger: o.logger,
runFunc: o.fn,

requestTimeout: o.requestTimeout,
}

return w
Expand Down
13 changes: 13 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,16 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) {
assert.Len(t, messages, 0)
assert.Equal(t, 2, count)
}

func TestRequestTimeout(t *testing.T) {
w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return nil
}),
WithRequestTimeout(10*time.Millisecond),
)
task, err := w.Request()
assert.Nil(t, task)
assert.Error(t, err)
assert.Equal(t, ErrNoTaskInQueue, err)
}
24 changes: 19 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package queue
import (
"context"
"runtime"
"time"

"github.com/golang-queue/queue/core"
)

var (
defaultQueueSize = 4096
defaultWorkerCount = runtime.NumCPU()
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
defaultMetric = NewMetric()
defaultQueueSize = 4096
defaultWorkerCount = runtime.NumCPU()
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
defaultMetric = NewMetric()
defaultRequestTimeout = 5 * time.Second
)

// An Option configures a mutex.
Expand Down Expand Up @@ -42,6 +44,13 @@ func WithQueueSize(num int) Option {
})
}

// WithQueueSize set worker count
func WithRequestTimeout(timeout time.Duration) Option {
return OptionFunc(func(q *Options) {
q.requestTimeout = timeout
})
}

// WithLogger set custom logger
func WithLogger(l Logger) Option {
return OptionFunc(func(q *Options) {
Expand Down Expand Up @@ -78,6 +87,9 @@ type Options struct {
worker core.Worker
fn func(context.Context, core.QueuedMessage) error
metric Metric

// timeout for request single task
requestTimeout time.Duration
}

// NewOptions initialize the default value for the options
Expand All @@ -89,6 +101,8 @@ func NewOptions(opts ...Option) *Options {
worker: nil,
fn: defaultFn,
metric: defaultMetric,

requestTimeout: defaultRequestTimeout,
}

// Loop through each option
Expand Down