From c850bacb96dc27b9c3f82d2335833ad9c0bd3280 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 15 Jan 2023 18:19:39 +0800 Subject: [PATCH 1/3] chore: add benchmark testing Signed-off-by: Bo-Yi.Wu --- benchmark_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/benchmark_test.go b/benchmark_test.go index bccea3b..ed207ee 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -9,6 +9,43 @@ import ( "github.com/golang-queue/queue/job" ) +var count = 1 + +type testqueue interface { + Queue(task core.QueuedMessage) error + Request() (core.QueuedMessage, error) +} + +func testQueue(b *testing.B, pool testqueue) { + message := job.NewTask(func(context.Context) error { + return nil + }, + job.AllowOption{ + RetryCount: job.Int64(100), + RetryDelay: job.Time(30 * time.Millisecond), + Timeout: job.Time(3 * time.Millisecond), + }, + ) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < count; i++ { + _ = pool.Queue(message) + _, _ = pool.Request() + } + } +} + +func BenchmarkNewCusumer(b *testing.B) { + pool := NewConsumer( + WithQueueSize(b.N*count), + WithLogger(emptyLogger{}), + ) + + testQueue(b, pool) +} + func BenchmarkQueueTask(b *testing.B) { b.ReportAllocs() q := NewPool(5, WithLogger(emptyLogger{})) From 987e0e601300feda8516bc0f3c7cdb4c5e3da0de Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 15 Jan 2023 18:25:43 +0800 Subject: [PATCH 2/3] chore(queue): add ring buffer queue Signed-off-by: Bo-Yi.Wu --- consumer.go | 75 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/consumer.go b/consumer.go index 874b357..952f81d 100644 --- a/consumer.go +++ b/consumer.go @@ -15,9 +15,12 @@ var errMaxCapacity = errors.New("max capacity reached") // Consumer for simple queue using buffer channel type Consumer struct { - taskQueue chan core.QueuedMessage + taskQueue []core.QueuedMessage runFunc func(context.Context, core.QueuedMessage) error - stop chan struct{} + capacity int + count int + head int + tail int exit chan struct{} logger Logger stopOnce sync.Once @@ -36,9 +39,7 @@ func (s *Consumer) Shutdown() error { } s.stopOnce.Do(func() { - close(s.stop) - close(s.taskQueue) - if len(s.taskQueue) > 0 { + if s.count > 0 { <-s.exit } }) @@ -46,42 +47,68 @@ func (s *Consumer) Shutdown() error { } // Queue send task to the buffer channel -func (s *Consumer) Queue(task core.QueuedMessage) error { +func (s *Consumer) Queue(task core.QueuedMessage) error { //nolint:stylecheck if atomic.LoadInt32(&s.stopFlag) == 1 { return ErrQueueShutdown } - - select { - case s.taskQueue <- task: - return nil - default: + if s.count >= s.capacity { return errMaxCapacity } + + if s.count == len(s.taskQueue) { + s.resize(s.count * 2) + } + s.taskQueue[s.tail] = task + s.tail = (s.tail + 1) % len(s.taskQueue) + s.count++ + + return nil } // Request a new task from channel func (s *Consumer) Request() (core.QueuedMessage, error) { - select { - case task, ok := <-s.taskQueue: - if !ok { - select { - case s.exit <- struct{}{}: - default: - } - return nil, ErrQueueHasBeenClosed + if atomic.LoadInt32(&s.stopFlag) == 1 && s.count == 0 { + select { + case s.exit <- struct{}{}: + default: } - return task, nil - default: + return nil, ErrQueueHasBeenClosed + } + + if s.count == 0 { return nil, ErrNoTaskInQueue } + data := s.taskQueue[s.head] + s.head = (s.head + 1) % len(s.taskQueue) + s.count-- + + if n := len(s.taskQueue) / 2; n > 2 && s.count <= n { + s.resize(n) + } + + return data, nil +} + +func (q *Consumer) resize(n int) { + nodes := make([]core.QueuedMessage, n) + if q.head < q.tail { + copy(nodes, q.taskQueue[q.head:q.tail]) + } else { + copy(nodes, q.taskQueue[q.head:]) + copy(nodes[len(q.taskQueue)-q.head:], q.taskQueue[:q.tail]) + } + + q.tail = q.count % n + q.head = 0 + q.taskQueue = nodes } -// NewConsumer for create new consumer instance +// NewConsumer for create new Consumer instance func NewConsumer(opts ...Option) *Consumer { o := NewOptions(opts...) w := &Consumer{ - taskQueue: make(chan core.QueuedMessage, o.queueSize), - stop: make(chan struct{}), + taskQueue: make([]core.QueuedMessage, 2), + capacity: o.queueSize, exit: make(chan struct{}), logger: o.logger, runFunc: o.fn, From ab7b55a91804b57106a91221df5f4566e04c49a1 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 15 Jan 2023 18:33:30 +0800 Subject: [PATCH 3/3] chore: update Signed-off-by: Bo-Yi.Wu --- consumer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consumer.go b/consumer.go index 952f81d..6e5a3f1 100644 --- a/consumer.go +++ b/consumer.go @@ -15,6 +15,7 @@ var errMaxCapacity = errors.New("max capacity reached") // Consumer for simple queue using buffer channel type Consumer struct { + sync.Mutex taskQueue []core.QueuedMessage runFunc func(context.Context, core.QueuedMessage) error capacity int @@ -55,12 +56,14 @@ func (s *Consumer) Queue(task core.QueuedMessage) error { //nolint:stylecheck return errMaxCapacity } + s.Lock() if s.count == len(s.taskQueue) { s.resize(s.count * 2) } s.taskQueue[s.tail] = task s.tail = (s.tail + 1) % len(s.taskQueue) s.count++ + s.Unlock() return nil } @@ -78,6 +81,7 @@ func (s *Consumer) Request() (core.QueuedMessage, error) { if s.count == 0 { return nil, ErrNoTaskInQueue } + s.Lock() data := s.taskQueue[s.head] s.head = (s.head + 1) % len(s.taskQueue) s.count-- @@ -85,6 +89,7 @@ func (s *Consumer) Request() (core.QueuedMessage, error) { if n := len(s.taskQueue) / 2; n > 2 && s.count <= n { s.resize(n) } + s.Unlock() return data, nil }