Skip to content

Commit

Permalink
[7.x] Implement backoff for Kafka output (#17808) (#18260)
Browse files Browse the repository at this point in the history
* Implement backoff for Kafka output (#17808)

* Expose wait duration from backoff strategies

* Add backoff section to kafka output config

* Use equal jitter backoff strategy with kafka producer

* Adding CHANGELOG entry

* Adding backoff options to reference config template

* Update reference config files

* Implementing new Backoff interface

* Adding explanation of strategy choice to comment

* Implementing new Backoff interface

* Adding godoc

* Resetting backoff

* Updating test

* Adding godoc for interface

* Adding struct tags for config settings

* Implementing stateless backoff function

* Undoing changes to backoff strategies

* Adding godoc

* Fixing backoff duration calculation

* WIP: Add unit test

* Refactor seed functionality into common package

* Fix test

* Testing evolution of backoff value over retries

* Fixing refactoring of PRNG seeding after rebase

* Better failure message from test

* Moving test utilities to internal package

* Fixing calls

* Adding config options to template

* Fixing up CHANGELOG

* Fixing up CHANGELOG
  • Loading branch information
ycombinator authored May 5, 2020
1 parent 553c92e commit e0078f2
Show file tree
Hide file tree
Showing 20 changed files with 254 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for basic ECS logging. {pull}17974[17974]
- Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153]
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]
- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions libbeat/_meta/config/output-kafka.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package backoff

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()
}

Expand Down
41 changes: 41 additions & 0 deletions libbeat/internal/testutil/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This file contains commonly-used utility functions for testing.

package testutil

import (
"flag"
"math/rand"
"testing"
"time"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

func SeedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}
42 changes: 37 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kafka
import (
"errors"
"fmt"
"math"
"math/rand"
"strings"
"time"

Expand All @@ -37,6 +39,11 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
)

type backoffConfig struct {
Init time.Duration `config:"init"`
Max time.Duration `config:"max"`
}

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Expand All @@ -55,6 +62,7 @@ type kafkaConfig struct {
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Backoff backoffConfig `config:"backoff"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Expand Down Expand Up @@ -106,10 +114,14 @@ func defaultConfig() kafkaConfig {
CompressionLevel: 4,
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
}

Expand Down Expand Up @@ -225,7 +237,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
// TODO: k.Producer.Retry.Backoff = ?
k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff)

// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
Expand Down Expand Up @@ -260,3 +272,23 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
}
return k, nil
}

// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually
// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff.
func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration {
maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init))))

return func(retries, _ int) time.Duration {
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))
}

// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter := rand.Int63n(limit + 1)
return time.Duration(limit + jitter)
}
}
34 changes: 34 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package kafka

import (
"fmt"
"math"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -97,3 +101,33 @@ func TestConfigInvalid(t *testing.T) {
})
}
}

func TestBackoffFunc(t *testing.T) {
testutil.SeedPRNG(t)
tests := map[int]backoffConfig{
15: {Init: 1 * time.Second, Max: 60 * time.Second},
7: {Init: 2 * time.Second, Max: 20 * time.Second},
4: {Init: 5 * time.Second, Max: 7 * time.Second},
}

for numRetries, backoffCfg := range tests {
t.Run(fmt.Sprintf("%v_retries", numRetries), func(t *testing.T) {
backoffFn := makeBackoffFunc(backoffCfg)

prevBackoff := backoffCfg.Init
for retries := 1; retries <= numRetries; retries++ {
backoff := prevBackoff * 2

expectedBackoff := math.Min(float64(backoff), float64(backoffCfg.Max))
actualBackoff := backoffFn(retries, 50)

if !((expectedBackoff/2 <= float64(actualBackoff)) && (float64(actualBackoff) <= expectedBackoff)) {
t.Fatalf("backoff '%v' not in expected range [%v, %v] (retries: %v)", actualBackoff, expectedBackoff/2, expectedBackoff, retries)
}

prevBackoff = backoff
}

})
}
}
3 changes: 2 additions & 1 deletion libbeat/publisher/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
Expand All @@ -43,7 +44,7 @@ func TestOutputReload(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)
Expand Down
7 changes: 4 additions & 3 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
Expand All @@ -42,7 +43,7 @@ func TestMakeClientWorker(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestReplaceClientWorker(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 1000 + (i % 100) // between 1000 and 1099
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestReplaceClientWorker(t *testing.T) {
}

func TestMakeClientTracer(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

numBatches := 10
numEvents := atomic.MakeUint(0)
Expand Down
16 changes: 0 additions & 16 deletions libbeat/publisher/pipeline/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package pipeline

import (
"context"
"flag"
"math/rand"
"sync"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

type mockPublishFn func(publisher.Batch) error

func newMockClient(publishFn mockPublishFn) outputs.Client {
Expand Down Expand Up @@ -158,16 +152,6 @@ func randIntBetween(min, max int) int {
return rand.Intn(max-min) + min
}

func seedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}

func waitUntilTrue(duration time.Duration, fn func() bool) bool {
end := time.Now().Add(duration)
for time.Now().Before(end) {
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
Loading

0 comments on commit e0078f2

Please sign in to comment.