Skip to content

Commit

Permalink
Output reloading flaky test: Log when test fails (#18071) (#18740)
Browse files Browse the repository at this point in the history
* Log numbers if test fails

* Logging counts when test fails

* Introducing a buffered logger for testing

* Adding mutex to logger

* Fleshing out logger interface a bit more

* Pass down test logger

* Adding comment
  • Loading branch information
ycombinator authored Jun 1, 2020
1 parent bcff902 commit d05a4fe
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 19 deletions.
4 changes: 3 additions & 1 deletion libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand Down Expand Up @@ -105,7 +106,8 @@ func (c *outputController) Set(outGrp outputs.Group) {
clients := outGrp.Clients
worker := make([]outputWorker, len(clients))
for i, client := range clients {
worker[i] = makeClientWorker(c.observer, c.workQueue, client, c.monitors.Tracer)
logger := logp.NewLogger("publisher_pipeline_output")
worker[i] = makeClientWorker(c.observer, c.workQueue, client, logger, c.monitors.Tracer)
}
grp := &outputGroup{
workQueue: c.workQueue,
Expand Down
29 changes: 29 additions & 0 deletions libbeat/publisher/pipeline/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

type logger interface {
Debug(vs ...interface{})
Debugf(fmt string, vs ...interface{})

Info(vs ...interface{})
Infof(fmt string, vs ...interface{})

Error(vs ...interface{})
Errorf(fmt string, vs ...interface{})
}
7 changes: 3 additions & 4 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"go.elastic.co/apm"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
)

Expand All @@ -49,12 +48,12 @@ type netClientWorker struct {

batchSize int
batchSizer func() int
logger *logp.Logger
logger logger

tracer *apm.Tracer
}

func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, tracer *apm.Tracer) outputWorker {
func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker {
w := worker{
observer: observer,
qu: qu,
Expand All @@ -70,7 +69,7 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie
c = &netClientWorker{
worker: w,
client: nc,
logger: logp.NewLogger("publisher_pipeline_output"),
logger: logger,
tracer: tracer,
}
} else {
Expand Down
88 changes: 78 additions & 10 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package pipeline

import (
"fmt"
"math"
"strings"
"sync"
"testing"
"testing/quick"
Expand All @@ -30,7 +32,6 @@ import (

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
)
Expand All @@ -49,8 +50,10 @@ func TestMakeClientWorker(t *testing.T) {
numBatches := 300 + (i % 100) // between 300 and 399
numEvents := atomic.MakeUint(0)

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil)
retryer := newRetryer(logger, nilObserver, wqu, nil)
defer retryer.close()

var published atomic.Uint
Expand All @@ -61,7 +64,7 @@ func TestMakeClientWorker(t *testing.T) {

client := ctor(publishFn)

worker := makeClientWorker(nilObserver, wqu, client, nil)
worker := makeClientWorker(nilObserver, wqu, client, logger, nil)
defer worker.Close()

for i := uint(0); i < numBatches; i++ {
Expand All @@ -74,9 +77,14 @@ func TestMakeClientWorker(t *testing.T) {
timeout := 20 * time.Second

// Make sure that all events have eventually been published
return waitUntilTrue(timeout, func() bool {
success := waitUntilTrue(timeout, func() bool {
return numEvents == published
})
if !success {
logger.Flush()
t.Logf("numBatches = %v, numEvents = %v, published = %v", numBatches, numEvents, published)
}
return success
}, nil)

if err != nil {
Expand All @@ -102,8 +110,10 @@ func TestReplaceClientWorker(t *testing.T) {
err := quick.Check(func(i uint) bool {
numBatches := 1000 + (i % 100) // between 1000 and 1099

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil)
retryer := newRetryer(logger, nilObserver, wqu, nil)
defer retryer.close()

var batches []publisher.Batch
Expand Down Expand Up @@ -140,7 +150,7 @@ func TestReplaceClientWorker(t *testing.T) {
}

client := ctor(blockingPublishFn)
worker := makeClientWorker(nilObserver, wqu, client, nil)
worker := makeClientWorker(nilObserver, wqu, client, logger, nil)

// Allow the worker to make *some* progress before we close it
timeout := 10 * time.Second
Expand All @@ -165,14 +175,20 @@ func TestReplaceClientWorker(t *testing.T) {
}

client = ctor(countingPublishFn)
makeClientWorker(nilObserver, wqu, client, nil)
makeClientWorker(nilObserver, wqu, client, logger, nil)
wg.Wait()

// Make sure that all events have eventually been published
timeout = 20 * time.Second
return waitUntilTrue(timeout, func() bool {
success := waitUntilTrue(timeout, func() bool {
return numEvents == int(publishedFirst.Load()+publishedLater.Load())
})
if !success {
logger.Flush()
t.Logf("numBatches = %v, numEvents = %v, publishedFirst = %v, publishedLater = %v",
numBatches, numEvents, publishedFirst.Load(), publishedLater.Load())
}
return success
}, &quick.Config{MaxCount: 25})

if err != nil {
Expand All @@ -188,8 +204,10 @@ func TestMakeClientTracer(t *testing.T) {
numBatches := 10
numEvents := atomic.MakeUint(0)

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil)
retryer := newRetryer(logger, nilObserver, wqu, nil)
defer retryer.close()

var published atomic.Uint
Expand All @@ -203,7 +221,7 @@ func TestMakeClientTracer(t *testing.T) {
recorder := apmtest.NewRecordingTracer()
defer recorder.Close()

worker := makeClientWorker(nilObserver, wqu, client, recorder.Tracer)
worker := makeClientWorker(nilObserver, wqu, client, logger, recorder.Tracer)
defer worker.Close()

for i := 0; i < numBatches; i++ {
Expand All @@ -227,6 +245,56 @@ func TestMakeClientTracer(t *testing.T) {
apmEvents := recorder.Payloads()
transactions := apmEvents.Transactions
if len(transactions) != numBatches {
logger.Flush()
t.Errorf("expected %d traces, got %d", numBatches, len(transactions))
}
}

// bufLogger is a buffered logger. It does not immediately print out log lines; instead it
// buffers them. To print them out, one must explicitly call it's Flush() method. This is
// useful when you want to see the logs only when tests fail but not when they pass.
type bufLogger struct {
t *testing.T
lines []string
mu sync.RWMutex
}

func (l *bufLogger) Debug(vs ...interface{}) { l.report("DEBUG", vs) }
func (l *bufLogger) Debugf(fmt string, vs ...interface{}) { l.reportf("DEBUG ", fmt, vs) }

func (l *bufLogger) Info(vs ...interface{}) { l.report("INFO", vs) }
func (l *bufLogger) Infof(fmt string, vs ...interface{}) { l.reportf("INFO", fmt, vs) }

func (l *bufLogger) Error(vs ...interface{}) { l.report("ERROR", vs) }
func (l *bufLogger) Errorf(fmt string, vs ...interface{}) { l.reportf("ERROR", fmt, vs) }

func (l *bufLogger) report(level string, vs []interface{}) {
str := strings.TrimRight(strings.Repeat("%v ", len(vs)), " ")
l.reportf(level, str, vs)
}
func (l *bufLogger) reportf(level, str string, vs []interface{}) {
str = level + ": " + str
line := fmt.Sprintf(str, vs...)

l.mu.Lock()
defer l.mu.Unlock()
l.lines = append(l.lines, line)
}

func (l *bufLogger) Flush() {
l.mu.Lock()
defer l.mu.Unlock()

for _, line := range l.lines {
l.t.Log(line)
}

l.lines = make([]string, 0)
}

func makeBufLogger(t *testing.T) *bufLogger {
return &bufLogger{
t: t,
lines: make([]string, 0),
}
}
6 changes: 2 additions & 4 deletions libbeat/publisher/pipeline/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package pipeline

import (
"sync"

"github.com/elastic/beats/v7/libbeat/logp"
)

// retryer is responsible for accepting and managing failed send attempts. It
Expand All @@ -31,7 +29,7 @@ import (
// will the consumer be paused, until some batches have been processed by some
// outputs.
type retryer struct {
logger *logp.Logger
logger logger
observer outputObserver

done chan struct{}
Expand Down Expand Up @@ -77,7 +75,7 @@ const (
)

func newRetryer(
log *logp.Logger,
log logger,
observer outputObserver,
out workQueue,
c interruptor,
Expand Down

0 comments on commit d05a4fe

Please sign in to comment.