From 580db764f4bf300efa37a13b8f93194467c97b10 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Mon, 4 Jan 2021 15:27:06 +0100 Subject: [PATCH 1/4] [pkg/queue] Add `StartConsumersWithFactory` function This provides a way to keep state for each consumer of a bounded queue, which is useful in certain performance-critical setups. Fixes #2685. Signed-off-by: Pablo Baeyens --- pkg/queue/bounded_queue.go | 39 +++++++++-- pkg/queue/bounded_queue_test.go | 110 ++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 7 deletions(-) diff --git a/pkg/queue/bounded_queue.go b/pkg/queue/bounded_queue.go index db4c5356a4c..07d6ab25366 100644 --- a/pkg/queue/bounded_queue.go +++ b/pkg/queue/bounded_queue.go @@ -25,6 +25,11 @@ import ( uatomic "go.uber.org/atomic" ) +// Consumer consumes data from a bounded queue +type Consumer interface { + Consume(item interface{}) +} + // BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, // where the queue is bounded and if it fills up due to slow consumers, the new items written by // the producer force the earliest items to be dropped. The implementation is actually based on @@ -38,7 +43,7 @@ type BoundedQueue struct { stopped *uatomic.Uint32 items *chan interface{} onDroppedItem func(item interface{}) - consumer func(item interface{}) + factory func() Consumer stopCh chan struct{} } @@ -56,11 +61,11 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde } } -// StartConsumers starts a given number of goroutines consuming items from the queue -// and passing them into the consumer callback. -func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { +// StartConsumersWithFactory creates a given number of consumers consuming items +// from the queue in separate goroutines. +func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) { q.workers = num - q.consumer = consumer + q.factory = factory var startWG sync.WaitGroup for i := 0; i < q.workers; i++ { q.stopWG.Add(1) @@ -68,13 +73,14 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) go func() { startWG.Done() defer q.stopWG.Done() + consumer := q.factory() queue := *q.items for { select { case item, ok := <-queue: if ok { q.size.Sub(1) - q.consumer(item) + consumer.Consume(item) } else { // channel closed, finish worker return @@ -89,6 +95,25 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) startWG.Wait() } +// statelessConsumer wraps a consume function callback +type statelessConsumer struct { + consumefn func(item interface{}) +} + +// Consumer consumes an item from a bounded queue +func (c *statelessConsumer) Consume(item interface{}) { + c.consumefn(item) +} + +// StartConsumers starts a given number of goroutines consuming items from the queue +// and passing them into the consumer callback. +func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{})) { + consumer := &statelessConsumer{callback} + q.StartConsumersWithFactory(num, func() Consumer { + return consumer + }) +} + // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. func (q *BoundedQueue) Produce(item interface{}) bool { if q.stopped.Load() != 0 { @@ -171,7 +196,7 @@ func (q *BoundedQueue) Resize(capacity int) bool { swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue)) if swapped { // start a new set of consumers, based on the information given previously - q.StartConsumers(q.workers, q.consumer) + q.StartConsumersWithFactory(q.workers, q.factory) // gracefully drain the existing queue close(previous) diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index 1aa617a093b..577e3d22b14 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -112,6 +112,80 @@ func TestBoundedQueue(t *testing.T) { assert.False(t, q.Produce("x"), "cannot push to closed queue") } +func TestBoundedQueueWithFactory(t *testing.T) { + mFact := metricstest.NewFactory(0) + counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) + gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) + + q := NewBoundedQueue(1, func(item interface{}) { + counter.Inc(1) + }) + assert.Equal(t, 1, q.Capacity()) + + var startLock sync.Mutex + + startLock.Lock() // block consumers + consumerState := newConsumerState(t) + + q.StartConsumersWithFactory(1, func() Consumer { + return newStatefulConsumer(&startLock, consumerState) + }) + + assert.True(t, q.Produce("a")) + + // at this point "a" may or may not have been received by the consumer go-routine + // so let's make sure it has been + consumerState.waitToConsumeOnce() + + // at this point the item must have been read off the queue, but the consumer is blocked + assert.Equal(t, 0, q.Size()) + consumerState.assertConsumed(map[string]bool{ + "a": true, + }) + + // produce two more items. The first one should be accepted, but not consumed. + assert.True(t, q.Produce("b")) + assert.Equal(t, 1, q.Size()) + // the second should be rejected since the queue is full + assert.False(t, q.Produce("c")) + assert.Equal(t, 1, q.Size()) + + q.StartLengthReporting(time.Millisecond, gauge) + for i := 0; i < 1000; i++ { + _, g := mFact.Snapshot() + if g["size"] == 0 { + time.Sleep(time.Millisecond) + } else { + break + } + } + + c, g := mFact.Snapshot() + assert.EqualValues(t, 1, c["dropped"]) + assert.EqualValues(t, 1, g["size"]) + + startLock.Unlock() // unblock consumer + + consumerState.assertConsumed(map[string]bool{ + "a": true, + "b": true, + }) + + // now that consumers are unblocked, we can add more items + expected := map[string]bool{ + "a": true, + "b": true, + } + for _, item := range []string{"d", "e", "f"} { + assert.True(t, q.Produce(item)) + expected[item] = true + consumerState.assertConsumed(expected) + } + + q.Stop() + assert.False(t, q.Produce("x"), "cannot push to closed queue") +} + type consumerState struct { sync.Mutex t *testing.T @@ -161,6 +235,24 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { assert.Equal(s.t, expected, s.snapshot()) } +type statefulConsumer struct { + *sync.Mutex + *consumerState +} + +func (s *statefulConsumer) Consume(item interface{}) { + s.record(item.(string)) + + // block further processing until the lock is released + s.Lock() + //lint:ignore SA2001 empty section is ok + s.Unlock() +} + +func newStatefulConsumer(startLock *sync.Mutex, cs *consumerState) Consumer { + return &statefulConsumer{startLock, cs} +} + func TestResizeUp(t *testing.T) { q := NewBoundedQueue(2, func(item interface{}) { fmt.Printf("dropped: %v\n", item) @@ -332,3 +424,21 @@ func BenchmarkBoundedQueue(b *testing.B) { q.Produce(n) } } + +// nopConsumer is a no-op consumer +type nopConsumer struct{} + +func (*nopConsumer) Consume(item interface{}) {} + +func BenchmarkBoundedQueueWithFactory(b *testing.B) { + q := NewBoundedQueue(1000, func(item interface{}) { + }) + + q.StartConsumersWithFactory(10, func() Consumer { + return &nopConsumer{} + }) + + for n := 0; n < b.N; n++ { + q.Produce(n) + } +} From 26a349e6b3278fa0ff194ad846d99a8ceb18f961 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 6 Jan 2021 14:03:31 +0100 Subject: [PATCH 2/4] Refactor bounded queue tests to extract common parts The common part with the assertions was moved to a new `checkQueue` function that is used by `TestBoundedQueue` and `TestBoundedQueueWithFactory`. Signed-off-by: Pablo Baeyens --- pkg/queue/bounded_queue_test.go | 109 ++++++++++---------------------- 1 file changed, 34 insertions(+), 75 deletions(-) diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index 577e3d22b14..f12e899e9a9 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -30,33 +30,14 @@ import ( uatomic "go.uber.org/atomic" ) -// In this test we run a queue with capacity 1 and a single consumer. -// We want to test the overflow behavior, so we block the consumer -// by holding a startLock before submitting items to the queue. -func TestBoundedQueue(t *testing.T) { - mFact := metricstest.NewFactory(0) - counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) +func checkQueue( + t *testing.T, + q *BoundedQueue, + startLock *sync.Mutex, + consumerState *consumerState, + mFact *metricstest.Factory, +) { gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) - - q := NewBoundedQueue(1, func(item interface{}) { - counter.Inc(1) - }) - assert.Equal(t, 1, q.Capacity()) - - var startLock sync.Mutex - - startLock.Lock() // block consumers - consumerState := newConsumerState(t) - - q.StartConsumers(1, func(item interface{}) { - consumerState.record(item.(string)) - - // block further processing until startLock is released - startLock.Lock() - //lint:ignore SA2001 empty section is ok - startLock.Unlock() - }) - assert.True(t, q.Produce("a")) // at this point "a" may or may not have been received by the consumer go-routine @@ -112,10 +93,12 @@ func TestBoundedQueue(t *testing.T) { assert.False(t, q.Produce("x"), "cannot push to closed queue") } -func TestBoundedQueueWithFactory(t *testing.T) { +// In this test we run a queue with capacity 1 and a single consumer. +// We want to test the overflow behavior, so we block the consumer +// by holding a startLock before submitting items to the queue. +func TestBoundedQueue(t *testing.T) { mFact := metricstest.NewFactory(0) counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) - gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) q := NewBoundedQueue(1, func(item interface{}) { counter.Inc(1) @@ -127,63 +110,39 @@ func TestBoundedQueueWithFactory(t *testing.T) { startLock.Lock() // block consumers consumerState := newConsumerState(t) - q.StartConsumersWithFactory(1, func() Consumer { - return newStatefulConsumer(&startLock, consumerState) + q.StartConsumers(1, func(item interface{}) { + consumerState.record(item.(string)) + + // block further processing until startLock is released + startLock.Lock() + //lint:ignore SA2001 empty section is ok + startLock.Unlock() }) - assert.True(t, q.Produce("a")) + checkQueue(t, q, &startLock, consumerState, mFact) +} - // at this point "a" may or may not have been received by the consumer go-routine - // so let's make sure it has been - consumerState.waitToConsumeOnce() +// This test is identical to the previous one but we start the +// queue using a consumer factory instead of a callback. +func TestBoundedQueueWithFactory(t *testing.T) { + mFact := metricstest.NewFactory(0) + counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) - // at this point the item must have been read off the queue, but the consumer is blocked - assert.Equal(t, 0, q.Size()) - consumerState.assertConsumed(map[string]bool{ - "a": true, + q := NewBoundedQueue(1, func(item interface{}) { + counter.Inc(1) }) + assert.Equal(t, 1, q.Capacity()) - // produce two more items. The first one should be accepted, but not consumed. - assert.True(t, q.Produce("b")) - assert.Equal(t, 1, q.Size()) - // the second should be rejected since the queue is full - assert.False(t, q.Produce("c")) - assert.Equal(t, 1, q.Size()) - - q.StartLengthReporting(time.Millisecond, gauge) - for i := 0; i < 1000; i++ { - _, g := mFact.Snapshot() - if g["size"] == 0 { - time.Sleep(time.Millisecond) - } else { - break - } - } - - c, g := mFact.Snapshot() - assert.EqualValues(t, 1, c["dropped"]) - assert.EqualValues(t, 1, g["size"]) + var startLock sync.Mutex - startLock.Unlock() // unblock consumer + startLock.Lock() // block consumers + consumerState := newConsumerState(t) - consumerState.assertConsumed(map[string]bool{ - "a": true, - "b": true, + q.StartConsumersWithFactory(1, func() Consumer { + return newStatefulConsumer(&startLock, consumerState) }) - // now that consumers are unblocked, we can add more items - expected := map[string]bool{ - "a": true, - "b": true, - } - for _, item := range []string{"d", "e", "f"} { - assert.True(t, q.Produce(item)) - expected[item] = true - consumerState.assertConsumed(expected) - } - - q.Stop() - assert.False(t, q.Produce("x"), "cannot push to closed queue") + checkQueue(t, q, &startLock, consumerState, mFact) } type consumerState struct { From d968bd5d27aa83f80f610b36c66696b8be600afb Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 7 Jan 2021 11:09:24 +0100 Subject: [PATCH 3/4] Use http.HandlerFunc pattern for getting a Consumer from a callback Signed-off-by: Pablo Baeyens --- pkg/queue/bounded_queue.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/queue/bounded_queue.go b/pkg/queue/bounded_queue.go index 07d6ab25366..0ade4b02c7b 100644 --- a/pkg/queue/bounded_queue.go +++ b/pkg/queue/bounded_queue.go @@ -95,22 +95,20 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume startWG.Wait() } -// statelessConsumer wraps a consume function callback -type statelessConsumer struct { - consumefn func(item interface{}) -} +// ConsumerFunc is an adapter to allow the use of +// a consume function callback as a Consumer. +type ConsumerFunc func(item interface{}) -// Consumer consumes an item from a bounded queue -func (c *statelessConsumer) Consume(item interface{}) { - c.consumefn(item) +// Consume calls c(item) +func (c ConsumerFunc) Consume(item interface{}) { + c(item) } // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{})) { - consumer := &statelessConsumer{callback} q.StartConsumersWithFactory(num, func() Consumer { - return consumer + return ConsumerFunc(callback) }) } From c955e1dad17665b7138e0b633345277c959f78e7 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 7 Jan 2021 11:40:53 +0100 Subject: [PATCH 4/4] Address review comment about unit tests Refactor unit tests using a `helper` function that takes a function to start consumers. Signed-off-by: Pablo Baeyens --- pkg/queue/bounded_queue_test.go | 106 ++++++++++---------------------- 1 file changed, 31 insertions(+), 75 deletions(-) diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index f12e899e9a9..1091f3f370f 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -30,14 +30,33 @@ import ( uatomic "go.uber.org/atomic" ) -func checkQueue( - t *testing.T, - q *BoundedQueue, - startLock *sync.Mutex, - consumerState *consumerState, - mFact *metricstest.Factory, -) { +// In this test we run a queue with capacity 1 and a single consumer. +// We want to test the overflow behavior, so we block the consumer +// by holding a startLock before submitting items to the queue. +func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item interface{}))) { + mFact := metricstest.NewFactory(0) + counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) + + q := NewBoundedQueue(1, func(item interface{}) { + counter.Inc(1) + }) + assert.Equal(t, 1, q.Capacity()) + + var startLock sync.Mutex + + startLock.Lock() // block consumers + consumerState := newConsumerState(t) + + startConsumers(q, func(item interface{}) { + consumerState.record(item.(string)) + + // block further processing until startLock is released + startLock.Lock() + //lint:ignore SA2001 empty section is ok + startLock.Unlock() + }) + assert.True(t, q.Produce("a")) // at this point "a" may or may not have been received by the consumer go-routine @@ -93,56 +112,16 @@ func checkQueue( assert.False(t, q.Produce("x"), "cannot push to closed queue") } -// In this test we run a queue with capacity 1 and a single consumer. -// We want to test the overflow behavior, so we block the consumer -// by holding a startLock before submitting items to the queue. func TestBoundedQueue(t *testing.T) { - mFact := metricstest.NewFactory(0) - counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) - - q := NewBoundedQueue(1, func(item interface{}) { - counter.Inc(1) - }) - assert.Equal(t, 1, q.Capacity()) - - var startLock sync.Mutex - - startLock.Lock() // block consumers - consumerState := newConsumerState(t) - - q.StartConsumers(1, func(item interface{}) { - consumerState.record(item.(string)) - - // block further processing until startLock is released - startLock.Lock() - //lint:ignore SA2001 empty section is ok - startLock.Unlock() + helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) { + q.StartConsumers(1, consumerFn) }) - - checkQueue(t, q, &startLock, consumerState, mFact) } -// This test is identical to the previous one but we start the -// queue using a consumer factory instead of a callback. func TestBoundedQueueWithFactory(t *testing.T) { - mFact := metricstest.NewFactory(0) - counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) - - q := NewBoundedQueue(1, func(item interface{}) { - counter.Inc(1) - }) - assert.Equal(t, 1, q.Capacity()) - - var startLock sync.Mutex - - startLock.Lock() // block consumers - consumerState := newConsumerState(t) - - q.StartConsumersWithFactory(1, func() Consumer { - return newStatefulConsumer(&startLock, consumerState) + helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) { + q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) }) }) - - checkQueue(t, q, &startLock, consumerState, mFact) } type consumerState struct { @@ -194,24 +173,6 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { assert.Equal(s.t, expected, s.snapshot()) } -type statefulConsumer struct { - *sync.Mutex - *consumerState -} - -func (s *statefulConsumer) Consume(item interface{}) { - s.record(item.(string)) - - // block further processing until the lock is released - s.Lock() - //lint:ignore SA2001 empty section is ok - s.Unlock() -} - -func newStatefulConsumer(startLock *sync.Mutex, cs *consumerState) Consumer { - return &statefulConsumer{startLock, cs} -} - func TestResizeUp(t *testing.T) { q := NewBoundedQueue(2, func(item interface{}) { fmt.Printf("dropped: %v\n", item) @@ -384,17 +345,12 @@ func BenchmarkBoundedQueue(b *testing.B) { } } -// nopConsumer is a no-op consumer -type nopConsumer struct{} - -func (*nopConsumer) Consume(item interface{}) {} - func BenchmarkBoundedQueueWithFactory(b *testing.B) { q := NewBoundedQueue(1000, func(item interface{}) { }) q.StartConsumersWithFactory(10, func() Consumer { - return &nopConsumer{} + return ConsumerFunc(func(item interface{}) {}) }) for n := 0; n < b.N; n++ {