Skip to content

chore(queue): replace channel queue with ring buffer queue. #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,43 @@ import (
"github.com/golang-queue/queue/job"
)

var count = 1

type testqueue interface {
Queue(task core.QueuedMessage) error
Request() (core.QueuedMessage, error)
}

func testQueue(b *testing.B, pool testqueue) {
message := job.NewTask(func(context.Context) error {
return nil
},
job.AllowOption{
RetryCount: job.Int64(100),
RetryDelay: job.Time(30 * time.Millisecond),
Timeout: job.Time(3 * time.Millisecond),
},
)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for i := 0; i < count; i++ {
_ = pool.Queue(message)
_, _ = pool.Request()
}
}
}

func BenchmarkNewCusumer(b *testing.B) {
pool := NewConsumer(
WithQueueSize(b.N*count),
WithLogger(emptyLogger{}),
)

testQueue(b, pool)
}

func BenchmarkQueueTask(b *testing.B) {
b.ReportAllocs()
q := NewPool(5, WithLogger(emptyLogger{}))
Expand Down
80 changes: 56 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ var errMaxCapacity = errors.New("max capacity reached")

// Consumer for simple queue using buffer channel
type Consumer struct {
taskQueue chan core.QueuedMessage
sync.Mutex
taskQueue []core.QueuedMessage
runFunc func(context.Context, core.QueuedMessage) error
stop chan struct{}
capacity int
count int
head int
tail int
exit chan struct{}
logger Logger
stopOnce sync.Once
Expand All @@ -36,52 +40,80 @@ func (s *Consumer) Shutdown() error {
}

s.stopOnce.Do(func() {
close(s.stop)
close(s.taskQueue)
if len(s.taskQueue) > 0 {
if s.count > 0 {
<-s.exit
}
})
return nil
}

// Queue send task to the buffer channel
func (s *Consumer) Queue(task core.QueuedMessage) error {
func (s *Consumer) Queue(task core.QueuedMessage) error { //nolint:stylecheck
if atomic.LoadInt32(&s.stopFlag) == 1 {
return ErrQueueShutdown
}

select {
case s.taskQueue <- task:
return nil
default:
if s.count >= s.capacity {
return errMaxCapacity
}

s.Lock()
if s.count == len(s.taskQueue) {
s.resize(s.count * 2)
}
s.taskQueue[s.tail] = task
s.tail = (s.tail + 1) % len(s.taskQueue)
s.count++
s.Unlock()

return nil
}

// Request a new task from channel
func (s *Consumer) Request() (core.QueuedMessage, error) {
select {
case task, ok := <-s.taskQueue:
if !ok {
select {
case s.exit <- struct{}{}:
default:
}
return nil, ErrQueueHasBeenClosed
if atomic.LoadInt32(&s.stopFlag) == 1 && s.count == 0 {
select {
case s.exit <- struct{}{}:
default:
}
return task, nil
default:
return nil, ErrQueueHasBeenClosed
}

if s.count == 0 {
return nil, ErrNoTaskInQueue
}
s.Lock()
data := s.taskQueue[s.head]
s.head = (s.head + 1) % len(s.taskQueue)
s.count--

if n := len(s.taskQueue) / 2; n > 2 && s.count <= n {
s.resize(n)
}
s.Unlock()

return data, nil
}

func (q *Consumer) resize(n int) {
nodes := make([]core.QueuedMessage, n)
if q.head < q.tail {
copy(nodes, q.taskQueue[q.head:q.tail])
} else {
copy(nodes, q.taskQueue[q.head:])
copy(nodes[len(q.taskQueue)-q.head:], q.taskQueue[:q.tail])
}

q.tail = q.count % n
q.head = 0
q.taskQueue = nodes
}

// NewConsumer for create new consumer instance
// NewConsumer for create new Consumer instance
func NewConsumer(opts ...Option) *Consumer {
o := NewOptions(opts...)
w := &Consumer{
taskQueue: make(chan core.QueuedMessage, o.queueSize),
stop: make(chan struct{}),
taskQueue: make([]core.QueuedMessage, 2),
capacity: o.queueSize,
exit: make(chan struct{}),
logger: o.logger,
runFunc: o.fn,
Expand Down