Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unit tests for publisher output #17460

Merged
merged 36 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d7ffd02
Adding unit tests for publisher output
ycombinator Apr 3, 2020
7e49fce
Adding another unit test (TODO)
ycombinator Apr 3, 2020
5627ff9
Adding unit test for closing worker midway
ycombinator Apr 3, 2020
babb398
Reorganizing imports
ycombinator Apr 3, 2020
b7163dd
Output PRNG seed + provide flag to specify seed
ycombinator Apr 3, 2020
0034f4b
Cancel batch with netClient if it is closed
ycombinator Apr 3, 2020
c0ee69e
Use waitUntil loop instead of hard sleep
ycombinator Apr 3, 2020
a950b19
Making mockClient threadsafe
ycombinator Apr 3, 2020
8018169
Removing goroutine from happy path unit test
ycombinator Apr 3, 2020
a92ec73
Using testing/quick
ycombinator Apr 3, 2020
29f7d23
Increase batch sizes in tests
ycombinator Apr 4, 2020
9fa686d
Adding sleep to ensure some batches are still at time of close
ycombinator Apr 5, 2020
3b71cf1
Experiment witht with slihigher sleep time
ycombinator Apr 5, 2020
eb04fce
Moving sleep to publish time
ycombinator Apr 5, 2020
4f14aec
Increase publish latency
ycombinator Apr 5, 2020
c0de546
Increasing publish latency again
ycombinator Apr 5, 2020
c2a0dec
Removing publishLatency
ycombinator Apr 6, 2020
2352fc3
Fix timeout to large value
ycombinator Apr 6, 2020
ee896eb
Make first client block after publishing X events
ycombinator Apr 7, 2020
06985d6
Actually block publishing
ycombinator Apr 7, 2020
66942f1
Reduce number of batches to prevent running out of memory
ycombinator Apr 7, 2020
d936aee
Bumping up # of batches
ycombinator Apr 7, 2020
1813f12
Bumping up # of batches again
ycombinator Apr 7, 2020
de3d3f6
Try different strategy - publish 80% of events
ycombinator Apr 7, 2020
e8cde93
Cranking up sleep time in publish blocking
ycombinator Apr 7, 2020
6f89e19
Only publish first 20% of events
ycombinator Apr 7, 2020
915e6ce
Make sure to return batch for retrying
ycombinator Apr 7, 2020
8bb19ee
Adding debugging statements to see what's happening in Travis
ycombinator Apr 7, 2020
57b527b
More robust to race conditions
ycombinator Apr 7, 2020
310dc4a
Restricting quick iterations to 50 to see if that helps in Travis
ycombinator Apr 7, 2020
94f3445
Put entire loop into goroutine
ycombinator Apr 7, 2020
be4bf09
Renaming tests
ycombinator Apr 7, 2020
508b606
Emulate blocking + mockable publish behavior
ycombinator Apr 7, 2020
457434f
Removing retry and return
ycombinator Apr 8, 2020
3b3de96
Clarify intent with comment
ycombinator Apr 8, 2020
6a8793f
Setting # of quick iterations to 25
ycombinator Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (w *clientWorker) Close() error {
func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
if w.closed.Load() {
if batch != nil {
batch.Cancelled()
}
return
}

w.observer.outBatchSend(len(batch.events))

if err := w.client.Publish(batch); err != nil {
Expand Down
260 changes: 260 additions & 0 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import (
"flag"
"math"
"math/rand"
"sync"
"testing"
"testing/quick"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

func TestPublish(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can rename the test? We don't test publish, but the tests seem to check the behavior of the clientWorkers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in be4bf09.

tests := map[string]func(uint) publishCountable{
"client": newMockClient,
"network_client": newMockNetworkClient,
}

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399

wqu := makeWorkQueue()
client := ctor(0)
makeClientWorker(nilObserver, wqu, client)

numEvents := atomic.MakeUint(0)
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ {
batch := randomBatch(50, 150, wqu)
numEvents.Add(uint(len(batch.Events())))
wqu <- batch
}

// Give some time for events to be published
timeout := 20 * time.Second

// Make sure that all events have eventually been published
return waitUntilTrue(timeout, func() bool {
return numEvents.Load() == client.Published()
})
}, nil)

if err != nil {
t.Error(err)
}
})
}
}

func TestPublishWithClose(t *testing.T) {
tests := map[string]func(uint) publishCountable{
"client": newMockClient,
"network_client": newMockNetworkClient,
}

const minEventsInBatch = 50

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 1000 + (i % 100) // between 1000 and 1099

wqu := makeWorkQueue()
numEvents := atomic.MakeUint(0)

var wg sync.WaitGroup
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ {
wg.Add(1)
go func() {
defer wg.Done()

batch := randomBatch(minEventsInBatch, 150, wqu)
numEvents.Add(uint(len(batch.Events())))
wqu <- batch
}()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of creating a go-routine per batch, how about creating a single go-routine that will execute the for loop? This would actually more close to how the workqueue is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do that. Just out of curiosity, say there are multiple Filebeat inputs — don't they each get their own goroutine sending to the same publisher work queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in 94f3445.


// Publish at least 1 batch worth of events but no more than 20% events
publishLimit := uint(math.Min(minEventsInBatch, float64(numEvents.Load())*0.2))
client := ctor(publishLimit)
worker := makeClientWorker(nilObserver, wqu, client)

// Allow the worker to make *some* progress before we close it
timeout := 10 * time.Second
progress := waitUntilTrue(timeout, func() bool {
return client.Published() >= publishLimit
})
if !progress {
return false
}

// Close worker before all batches have had time to be published
err := worker.Close()
require.NoError(t, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we might have a race. E.g. how can you tell not all batches have been consumed? Does the initial worker 'block' without ACKing the batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial worker does not block without ACKing the batch. Indeed, this is probably why I'm seeing different results locally vs. in Travis CI environment.


published := client.Published()

// Start new worker to drain work queue
client = ctor(0)
makeClientWorker(nilObserver, wqu, client)
wg.Wait()

// Make sure that all events have eventually been published
timeout = 20 * time.Second
return waitUntilTrue(timeout, func() bool {
total := published + client.Published()
return numEvents.Load() == total
})
}, &quick.Config{MaxCount: 50})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the MaxCount to 50 seems to keep Travis happy. Before that the build job was being killed because it was taking too long.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before introducing quick check the count was actually 1 :)

This is some kind of stress test. Unfortunately stress tests don't sit well with travis. We have had bad performance issues with the queue stress tests as well. I think long termI think we should not have stress tests run by travis, but have a separate job running those for even longer. For 'some' simple unit testing a count of 1 might be ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, good point :)

So would you recommend leaving this at 50, lowering it to 1 or maybe somewhere in between? I ask because while 50 is working at the moment I'm worried whether it'll become a source of flakiness. I don't think there's a way to know for sure until Travis runs this several times, though?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it's difficult to find the right value. Maybe set it to 25, so we have some more head-room.


if err != nil {
t.Error(err)
}
})
}
}

type publishCountable interface {
outputs.Client
Published() uint
}

func newMockClient(publishLimit uint) publishCountable {
return &mockClient{publishLimit: publishLimit}
}

type mockClient struct {
mu sync.RWMutex
publishLimit uint
published uint
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can excert more control by replacing published and publishLimit with a func. The func could decide when to block and when to continue. This way the test logic would be more contained within the test. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented mockable publish behavior function in 508b606.

}

func (c *mockClient) Published() uint {
c.mu.RLock()
defer c.mu.RUnlock()

return c.published
}

func (c *mockClient) String() string { return "mock_client" }
func (c *mockClient) Close() error { return nil }
func (c *mockClient) Publish(batch publisher.Batch) error {
c.mu.Lock()
defer c.mu.Unlock()

// Block publishing
if c.publishLimit > 0 && c.published >= c.publishLimit {
batch.Retry() // to simulate not acking
return nil
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso WDYT about this logic to emulate blocking publishing?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This emulation is not blocking, but a failing output. The batch is not acked, but this Retrty + return nil will signal that the output is ready to consumer another batch.

A Blocking simulation would require you to wait for some signal (e.g. via a control channel).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point I had a sleep in here between the retry and return. The idea then was that the first client would be closed before the sleep finished.

A control channel is better than a sleep. Once the first client is closed I can close the control channel to remove the block. However, the Retry (before waiting to consume from the control channel) will still be needed, otherwise the final publish count doesn't add up to the expected total number of events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented blocking with control channel in 508b606.


c.published += uint(len(batch.Events()))
return nil
}

func newMockNetworkClient(publishLimit uint) publishCountable {
return &mockNetworkClient{newMockClient(publishLimit)}
}

type mockNetworkClient struct {
publishCountable
}

func (c *mockNetworkClient) Connect() error { return nil }

type mockQueue struct{}

func (q mockQueue) Close() error { return nil }
func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} }
func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} }
func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} }

type mockProducer struct{}

func (p mockProducer) Publish(event publisher.Event) bool { return true }
func (p mockProducer) TryPublish(event publisher.Event) bool { return true }
func (p mockProducer) Cancel() int { return 0 }

type mockConsumer struct{}

func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil }
func (c mockConsumer) Close() error { return nil }

func randomBatch(min, max int, wqu workQueue) *Batch {
numEvents := randIntBetween(min, max)
events := make([]publisher.Event, numEvents)

consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{})
retryer := newRetryer(logp.L(), nilObserver, wqu, consumer)

batch := Batch{
events: events,
ctx: &batchContext{
observer: nilObserver,
retryer: retryer,
},
}

return &batch
}

// randIntBetween returns a random integer in [min, max)
func randIntBetween(min, max int) int {
return rand.Intn(max-min) + min
}

func seedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}

func waitUntilTrue(duration time.Duration, fn func() bool) bool {
end := time.Now().Add(duration)
for time.Now().Before(end) {
if fn() {
return true
}
time.Sleep(1 * time.Millisecond)
}
return false
}