Skip to content

chore(job): support custom options #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go: [1.13, 1.14, 1.15, 1.16, 1.17, 1.18, 1.19]
go: [1.18, 1.19]
include:
- os: ubuntu-latest
go-build: ~/.cache/go-build
Expand Down
15 changes: 8 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Consumer)(nil)
Expand All @@ -26,12 +27,12 @@ type Consumer struct {
stopFlag int32
}

func (s *Consumer) handle(job *Job) error {
func (s *Consumer) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()
Expand All @@ -46,10 +47,10 @@ func (s *Consumer) handle(job *Job) error {
}()

// run custom process function
if job.Task != nil {
done <- job.Task(ctx)
if m.Task != nil {
done <- m.Task(ctx)
} else {
done <- s.runFunc(ctx, job)
done <- s.runFunc(ctx, m)
}
}()

Expand All @@ -62,7 +63,7 @@ func (s *Consumer) handle(job *Job) error {
// cancel job
cancel()

leftTime := job.Timeout - time.Since(startTime)
leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
Expand All @@ -79,7 +80,7 @@ func (s *Consumer) handle(job *Job) error {

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
data := task.(*Job)
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}
Expand Down
35 changes: 18 additions & 17 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
"github.com/golang-queue/queue/mocks"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m))
assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond)))
q.Start()
time.Sleep(50 * time.Millisecond)
q.Release()
Expand Down Expand Up @@ -137,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
q.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 2, int(q.metric.busyWorkers))
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestGoroutinePanic(t *testing.T) {
}

func TestHandleTimeout(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -218,11 +219,11 @@ func TestHandleTimeout(t *testing.T) {
}),
)

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

job = &Job{
m = &job.Message{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -236,7 +237,7 @@ func TestHandleTimeout(t *testing.T) {

done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
Expand All @@ -245,7 +246,7 @@ func TestHandleTimeout(t *testing.T) {
}

func TestJobComplete(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -255,11 +256,11 @@ func TestJobComplete(t *testing.T) {
}),
)

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = &Job{
m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -273,7 +274,7 @@ func TestJobComplete(t *testing.T) {

done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
Expand All @@ -282,19 +283,19 @@ func TestJobComplete(t *testing.T) {
}

func TestTaskJobComplete(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
},
}
w := NewConsumer()

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = &Job{
m = &job.Message{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
Expand All @@ -304,21 +305,21 @@ func TestTaskJobComplete(t *testing.T) {
w = NewConsumer()
done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
assert.NoError(t, err)

// job timeout
job = &Job{
m = &job.Message{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
return nil
},
}
assert.Equal(t, context.DeadlineExceeded, w.handle(job))
assert.Equal(t, context.DeadlineExceeded, w.handle(m))
}

func TestIncreaseWorkerCount(t *testing.T) {
Expand Down
67 changes: 67 additions & 0 deletions job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package job

import (
"context"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

// TaskFunc is the task function
type TaskFunc func(context.Context) error

// Message describes a task and its metadata.
type Message struct {
Task TaskFunc `json:"-"`

// Timeout is the duration the task can be processed by Handler.
// zero if not specified
Timeout time.Duration `json:"timeout"`

// Payload is the payload data of the task.
Payload []byte `json:"body"`

// RetryCount retry count if failure
RetryCount int64 `json:"retry_count"`

// RetryCount retry count if failure
RetryDelay time.Duration `json:"retry_delay"`
}

// Bytes get string body
func (m *Message) Bytes() []byte {
if m.Task != nil {
return nil
}
return m.Payload
}

// Encode for encoding the structure
func (m *Message) Encode() []byte {
b, _ := json.Marshal(m)

return b
}

func NewMessage(m core.QueuedMessage, opts ...Option) *Message {
o := NewOptions(opts...)

return &Message{
RetryCount: o.retryCount,
RetryDelay: o.retryDelay,
Timeout: o.timeout,
Payload: m.Bytes(),
}
}

func NewTask(task TaskFunc, opts ...Option) *Message {
o := NewOptions(opts...)

return &Message{
Timeout: o.timeout,
RetryCount: o.retryCount,
RetryDelay: o.retryDelay,
Task: task,
}
}
56 changes: 56 additions & 0 deletions job/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package job

import "time"

type Options struct {
retryCount int64
retryDelay time.Duration
timeout time.Duration
}

// An Option configures a mutex.
type Option interface {
apply(*Options)
}

// OptionFunc is a function that configures a job.
type OptionFunc func(*Options)

// apply calls f(option)
func (f OptionFunc) apply(option *Options) {
f(option)
}

func NewOptions(opts ...Option) *Options {
o := &Options{
retryCount: 0,
retryDelay: 100 * time.Millisecond,
timeout: 60 * time.Minute,
}

// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
opt.apply(o)
}

return o
}

func WithRetryCount(count int64) Option {
return OptionFunc(func(o *Options) {
o.retryCount = count
})
}

func WithRetryDelay(t time.Duration) Option {
return OptionFunc(func(o *Options) {
o.retryDelay = t
})
}

func WithTimeout(t time.Duration) Option {
return OptionFunc(func(o *Options) {
o.timeout = t
})
}
11 changes: 0 additions & 11 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package queue
import (
"context"
"runtime"
"time"

"github.com/golang-queue/queue/core"
)

var (
defaultQueueSize = 4096
defaultWorkerCount = runtime.NumCPU()
defaultTimeout = 60 * time.Minute
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
defaultMetric = NewMetric()
Expand Down Expand Up @@ -72,17 +70,9 @@ func WithFn(fn func(context.Context, core.QueuedMessage) error) Option {
})
}

// WithTimeOut set custom timeout
func WithTimeOut(t time.Duration) Option {
return OptionFunc(func(q *Options) {
q.timeout = t
})
}

// Options for custom args in Queue
type Options struct {
workerCount int
timeout time.Duration
logger Logger
queueSize int
worker core.Worker
Expand All @@ -95,7 +85,6 @@ func NewOptions(opts ...Option) *Options {
o := &Options{
workerCount: defaultWorkerCount,
queueSize: defaultQueueSize,
timeout: defaultTimeout,
logger: defaultNewLogger,
worker: nil,
fn: defaultFn,
Expand Down
Loading