Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unit tests for publisher output #17460

Merged
merged 36 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d7ffd02
Adding unit tests for publisher output
ycombinator Apr 3, 2020
7e49fce
Adding another unit test (TODO)
ycombinator Apr 3, 2020
5627ff9
Adding unit test for closing worker midway
ycombinator Apr 3, 2020
babb398
Reorganizing imports
ycombinator Apr 3, 2020
b7163dd
Output PRNG seed + provide flag to specify seed
ycombinator Apr 3, 2020
0034f4b
Cancel batch with netClient if it is closed
ycombinator Apr 3, 2020
c0ee69e
Use waitUntil loop instead of hard sleep
ycombinator Apr 3, 2020
a950b19
Making mockClient threadsafe
ycombinator Apr 3, 2020
8018169
Removing goroutine from happy path unit test
ycombinator Apr 3, 2020
a92ec73
Using testing/quick
ycombinator Apr 3, 2020
29f7d23
Increase batch sizes in tests
ycombinator Apr 4, 2020
9fa686d
Adding sleep to ensure some batches are still at time of close
ycombinator Apr 5, 2020
3b71cf1
Experiment witht with slihigher sleep time
ycombinator Apr 5, 2020
eb04fce
Moving sleep to publish time
ycombinator Apr 5, 2020
4f14aec
Increase publish latency
ycombinator Apr 5, 2020
c0de546
Increasing publish latency again
ycombinator Apr 5, 2020
c2a0dec
Removing publishLatency
ycombinator Apr 6, 2020
2352fc3
Fix timeout to large value
ycombinator Apr 6, 2020
ee896eb
Make first client block after publishing X events
ycombinator Apr 7, 2020
06985d6
Actually block publishing
ycombinator Apr 7, 2020
66942f1
Reduce number of batches to prevent running out of memory
ycombinator Apr 7, 2020
d936aee
Bumping up # of batches
ycombinator Apr 7, 2020
1813f12
Bumping up # of batches again
ycombinator Apr 7, 2020
de3d3f6
Try different strategy - publish 80% of events
ycombinator Apr 7, 2020
e8cde93
Cranking up sleep time in publish blocking
ycombinator Apr 7, 2020
6f89e19
Only publish first 20% of events
ycombinator Apr 7, 2020
915e6ce
Make sure to return batch for retrying
ycombinator Apr 7, 2020
8bb19ee
Adding debugging statements to see what's happening in Travis
ycombinator Apr 7, 2020
57b527b
More robust to race conditions
ycombinator Apr 7, 2020
310dc4a
Restricting quick iterations to 50 to see if that helps in Travis
ycombinator Apr 7, 2020
94f3445
Put entire loop into goroutine
ycombinator Apr 7, 2020
be4bf09
Renaming tests
ycombinator Apr 7, 2020
508b606
Emulate blocking + mockable publish behavior
ycombinator Apr 7, 2020
457434f
Removing retry and return
ycombinator Apr 8, 2020
3b3de96
Clarify intent with comment
ycombinator Apr 8, 2020
6a8793f
Setting # of quick iterations to 25
ycombinator Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (w *clientWorker) Close() error {
func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
if w.closed.Load() {
if batch != nil {
batch.Cancelled()
}
return
}

w.observer.outBatchSend(len(batch.events))

if err := w.client.Publish(batch); err != nil {
Expand Down
261 changes: 261 additions & 0 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import (
"flag"
"math"
"math/rand"
"sync"
"testing"
"testing/quick"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

func TestMakeClientWorker(t *testing.T) {
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
"network_client": newMockNetworkClient,
}

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399

var published atomic.Uint
publishFn := func(batch publisher.Batch) error {
published.Add(uint(len(batch.Events())))
return nil
}

wqu := makeWorkQueue()
client := ctor(publishFn)
makeClientWorker(nilObserver, wqu, client)

numEvents := atomic.MakeUint(0)
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ {
batch := randomBatch(50, 150, wqu)
numEvents.Add(uint(len(batch.Events())))
wqu <- batch
}

// Give some time for events to be published
timeout := 20 * time.Second

// Make sure that all events have eventually been published
return waitUntilTrue(timeout, func() bool {
return numEvents == published
})
}, nil)

if err != nil {
t.Error(err)
}
})
}
}

func TestMakeClientWorkerAndClose(t *testing.T) {
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
"network_client": newMockNetworkClient,
}

const minEventsInBatch = 50

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 1000 + (i % 100) // between 1000 and 1099

wqu := makeWorkQueue()
numEvents := atomic.MakeUint(0)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ {
batch := randomBatch(minEventsInBatch, 150, wqu)
numEvents.Add(uint(len(batch.Events())))
wqu <- batch
}
}()

// Publish at least 1 batch worth of events but no more than 20% events
publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2))

var publishedFirst atomic.Uint
blockCtrl := make(chan struct{})
blockingPublishFn := func(batch publisher.Batch) error {
// Emulate blocking. Upon unblocking the in-flight batch that was
// blocked is published.
if publishedFirst.Load() >= publishLimit {
<-blockCtrl
}

publishedFirst.Add(uint(len(batch.Events())))
return nil
}

client := ctor(blockingPublishFn)
worker := makeClientWorker(nilObserver, wqu, client)

// Allow the worker to make *some* progress before we close it
timeout := 10 * time.Second
progress := waitUntilTrue(timeout, func() bool {
return publishedFirst.Load() >= publishLimit
})
if !progress {
return false
}

// Close worker before all batches have had time to be published
err := worker.Close()
require.NoError(t, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we might have a race. E.g. how can you tell not all batches have been consumed? Does the initial worker 'block' without ACKing the batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial worker does not block without ACKing the batch. Indeed, this is probably why I'm seeing different results locally vs. in Travis CI environment.

close(blockCtrl)

// Start new worker to drain work queue
var publishedLater atomic.Uint
countingPublishFn := func(batch publisher.Batch) error {
publishedLater.Add(uint(len(batch.Events())))
return nil
}

client = ctor(countingPublishFn)
makeClientWorker(nilObserver, wqu, client)
wg.Wait()

// Make sure that all events have eventually been published
timeout = 20 * time.Second
return waitUntilTrue(timeout, func() bool {
return numEvents.Load() == publishedFirst.Load()+publishedLater.Load()
})
}, &quick.Config{MaxCount: 25})

if err != nil {
t.Error(err)
}
})
}
}

type mockPublishFn func(publisher.Batch) error

func newMockClient(publishFn mockPublishFn) outputs.Client {
return &mockClient{publishFn: publishFn}
}

type mockClient struct {
publishFn mockPublishFn
}

func (c *mockClient) String() string { return "mock_client" }
func (c *mockClient) Close() error { return nil }
func (c *mockClient) Publish(batch publisher.Batch) error {
return c.publishFn(batch)
}

func newMockNetworkClient(publishFn mockPublishFn) outputs.Client {
return &mockNetworkClient{newMockClient(publishFn)}
}

type mockNetworkClient struct {
outputs.Client
}

func (c *mockNetworkClient) Connect() error { return nil }

type mockQueue struct{}

func (q mockQueue) Close() error { return nil }
func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} }
func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} }
func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} }

type mockProducer struct{}

func (p mockProducer) Publish(event publisher.Event) bool { return true }
func (p mockProducer) TryPublish(event publisher.Event) bool { return true }
func (p mockProducer) Cancel() int { return 0 }

type mockConsumer struct{}

func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil }
func (c mockConsumer) Close() error { return nil }

func randomBatch(min, max int, wqu workQueue) *Batch {
numEvents := randIntBetween(min, max)
events := make([]publisher.Event, numEvents)

consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{})
retryer := newRetryer(logp.L(), nilObserver, wqu, consumer)

batch := Batch{
events: events,
ctx: &batchContext{
observer: nilObserver,
retryer: retryer,
},
}

return &batch
}

// randIntBetween returns a random integer in [min, max)
func randIntBetween(min, max int) int {
return rand.Intn(max-min) + min
}

func seedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}

func waitUntilTrue(duration time.Duration, fn func() bool) bool {
end := time.Now().Add(duration)
for time.Now().Before(end) {
if fn() {
return true
}
time.Sleep(1 * time.Millisecond)
}
return false
}