Skip to content

feat(metric): add success, failure and submitted tasks #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@ type Metric interface {
IncBusyWorker()
DecBusyWorker()
BusyWorkers() uint64
SuccessTasks() uint64
FailureTasks() uint64
SubmittedTasks() uint64
IncSuccessTask()
IncFailureTask()
IncSubmittedTask()
}

var _ Metric = (*metric)(nil)

type metric struct {
busyWorkers uint64
busyWorkers uint64
successTasks uint64
failureTasks uint64
submittedTasks uint64
}

// NewMetric for default metric structure
Expand All @@ -29,3 +40,27 @@ func (m *metric) DecBusyWorker() {
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}

func (m *metric) IncSuccessTask() {
atomic.AddUint64(&m.successTasks, 1)
}

func (m *metric) IncFailureTask() {
atomic.AddUint64(&m.failureTasks, 1)
}

func (m *metric) IncSubmittedTask() {
atomic.AddUint64(&m.submittedTasks, 1)
}

func (m *metric) SuccessTasks() uint64 {
return atomic.LoadUint64(&m.successTasks)
}

func (m *metric) FailureTasks() uint64 {
return atomic.LoadUint64(&m.failureTasks)
}

func (m *metric) SubmittedTasks() uint64 {
return atomic.LoadUint64(&m.submittedTasks)
}
49 changes: 49 additions & 0 deletions metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package queue

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMetricData(t *testing.T) {
w := NewConsumer(
WithFn(func(ctx context.Context, m QueuedMessage) error {
switch string(m.Bytes()) {
case "foo1":
panic("missing something")
case "foo2":
return errors.New("missing something")
case "foo3":
return nil
}
return nil
}),
)
q, err := NewQueue(
WithWorker(w),
WithWorkerCount(4),
)
assert.NoError(t, err)
assert.NoError(t, q.Queue(mockMessage{
message: "foo1",
}))
assert.NoError(t, q.Queue(mockMessage{
message: "foo2",
}))
assert.NoError(t, q.Queue(mockMessage{
message: "foo3",
}))
assert.NoError(t, q.Queue(mockMessage{
message: "foo4",
}))
q.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, 4, q.SubmittedTasks())
assert.Equal(t, 2, q.SuccessTasks())
assert.Equal(t, 2, q.FailureTasks())
q.Release()
}
86 changes: 56 additions & 30 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ func NewQueue(opts ...Option) (*Queue, error) {
return q, nil
}

// Capacity for queue max size
func (q *Queue) Capacity() int {
return q.worker.Capacity()
}

// Usage for count of queue usage
func (q *Queue) Usage() int {
return q.worker.Usage()
}

// Start to enable all worker
func (q *Queue) Start() {
go q.start()
Expand Down Expand Up @@ -123,24 +113,24 @@ func (q *Queue) BusyWorkers() int {
return int(q.metric.BusyWorkers())
}

// Wait all process
func (q *Queue) Wait() {
q.routineGroup.Wait()
// BusyWorkers returns the numbers of success tasks.
func (q *Queue) SuccessTasks() int {
return int(q.metric.SuccessTasks())
}

func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}
// BusyWorkers returns the numbers of failure tasks.
func (q *Queue) FailureTasks() int {
return int(q.metric.FailureTasks())
}

data := Job{
Timeout: timeout,
Payload: job.Bytes(),
}
// BusyWorkers returns the numbers of submitted tasks.
func (q *Queue) SubmittedTasks() int {
return int(q.metric.SubmittedTasks())
}

return q.worker.Queue(Job{
Payload: data.Encode(),
})
// Wait all process
func (q *Queue) Wait() {
q.routineGroup.Wait()
}

// Queue to queue all job
Expand All @@ -153,19 +143,25 @@ func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error
return q.handleQueue(timeout, job)
}

func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

data := Job{
Timeout: timeout,
Payload: job.Bytes(),
}

return q.worker.Queue(Job{
Task: task,
if err := q.worker.Queue(Job{
Payload: data.Encode(),
})
}); err != nil {
return err
}

q.metric.IncSubmittedTask()

return nil
}

// QueueTask to queue job task
Expand All @@ -178,18 +174,48 @@ func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error
return q.handleQueueTask(timeout, task)
}

func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

data := Job{
Timeout: timeout,
}

if err := q.worker.Queue(Job{
Task: task,
Payload: data.Encode(),
}); err != nil {
return err
}

q.metric.IncSubmittedTask()

return nil
}

func (q *Queue) work(task QueuedMessage) {
var err error
// to handle panic cases from inside the worker
// in such case, we start a new goroutine
defer func() {
q.metric.DecBusyWorker()
if err := recover(); err != nil {
e := recover()
if e != nil {
q.logger.Errorf("panic error: %v", err)
}
q.schedule()

// increase success or failure number
if err == nil && e == nil {
q.metric.IncSuccessTask()
} else {
q.metric.IncFailureTask()
}
}()

if err := q.worker.Run(task); err != nil {
if err = q.worker.Run(task); err != nil {
q.logger.Errorf("runtime error: %s", err.Error())
}
}
Expand Down
28 changes: 1 addition & 27 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNewQueue(t *testing.T) {
assert.NotNil(t, q)

q.Start()
assert.Equal(t, uint64(0), w.BusyWorkers())
assert.Equal(t, 0, q.BusyWorkers())
q.Shutdown()
q.Wait()
}
Expand All @@ -59,31 +59,6 @@ func TestShtdonwOnce(t *testing.T) {
assert.Equal(t, 0, q.BusyWorkers())
}

func TestWorkerStatus(t *testing.T) {
m := mockMessage{
message: "foobar",
}
w := &messageWorker{
messages: make(chan QueuedMessage, 100),
}
q, err := NewQueue(
WithWorker(w),
WithWorkerCount(2),
)
assert.NoError(t, err)
assert.NotNil(t, q)

assert.NoError(t, q.Queue(m))
assert.NoError(t, q.Queue(m))
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
assert.Equal(t, 100, q.Capacity())
assert.Equal(t, 4, q.Usage())
q.Start()
time.Sleep(40 * time.Millisecond)
q.Release()
}

func TestCapacityReached(t *testing.T) {
w := &messageWorker{
messages: make(chan QueuedMessage, 1),
Expand Down Expand Up @@ -160,7 +135,6 @@ func TestQueueTaskJob(t *testing.T) {
return nil
}))
time.Sleep(50 * time.Millisecond)
assert.Equal(t, uint64(0), w.BusyWorkers())
q.Shutdown()
assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error {
return nil
Expand Down
6 changes: 0 additions & 6 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ type Worker interface {
Queue(task QueuedMessage) error
// Request to get message from Queue
Request() (QueuedMessage, error)
// Capacity queue capacity = cap(channel name)
Capacity() int
// Usage is how many message in queue
Usage() int
// BusyWorkers return count of busy worker currently
BusyWorkers() uint64
}

// QueuedMessage ...
Expand Down
3 changes: 0 additions & 3 deletions worker_empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,3 @@ func (w *emptyWorker) Run(task QueuedMessage) error { return nil }
func (w *emptyWorker) Shutdown() error { return nil }
func (w *emptyWorker) Queue(task QueuedMessage) error { return nil }
func (w *emptyWorker) Request() (QueuedMessage, error) { return nil, nil }
func (w *emptyWorker) Capacity() int { return 0 }
func (w *emptyWorker) Usage() int { return 0 }
func (w *emptyWorker) BusyWorkers() uint64 { return uint64(0) }
4 changes: 0 additions & 4 deletions worker_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,3 @@ func (w *messageWorker) Request() (QueuedMessage, error) {
return nil, errors.New("no message in queue")
}
}

func (w *messageWorker) Capacity() int { return cap(w.messages) }
func (w *messageWorker) Usage() int { return len(w.messages) }
func (w *messageWorker) BusyWorkers() uint64 { return uint64(0) }
4 changes: 0 additions & 4 deletions worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,3 @@ func (w *taskWorker) Request() (QueuedMessage, error) {
return nil, errors.New("no message in queue")
}
}

func (w *taskWorker) Capacity() int { return cap(w.messages) }
func (w *taskWorker) Usage() int { return len(w.messages) }
func (w *taskWorker) BusyWorkers() uint64 { return uint64(0) }