From 8549b621790fbc0c72e97c5f3f76a8c7175eef63 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 5 May 2020 09:46:00 -0700 Subject: [PATCH 1/3] Implement backoff for Kafka output (#17808) * Expose wait duration from backoff strategies * Add backoff section to kafka output config * Use equal jitter backoff strategy with kafka producer * Adding CHANGELOG entry * Adding backoff options to reference config template * Update reference config files * Implementing new Backoff interface * Adding explanation of strategy choice to comment * Implementing new Backoff interface * Adding godoc * Resetting backoff * Updating test * Adding godoc for interface * Adding struct tags for config settings * Implementing stateless backoff function * Undoing changes to backoff strategies * Adding godoc * Fixing backoff duration calculation * WIP: Add unit test * Refactor seed functionality into common package * Fix test * Testing evolution of backoff value over retries * Fixing refactoring of PRNG seeding after rebase * Better failure message from test * Moving test utilities to internal package * Fixing calls * Adding config options to template --- CHANGELOG.next.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 11 +++++ filebeat/filebeat.reference.yml | 11 +++++ heartbeat/heartbeat.reference.yml | 11 +++++ journalbeat/journalbeat.reference.yml | 11 +++++ .../config/output-kafka.reference.yml.tmpl | 11 +++++ libbeat/common/backoff/backoff.go | 3 ++ libbeat/internal/testutil/util.go | 41 ++++++++++++++++++ libbeat/outputs/kafka/config.go | 42 ++++++++++++++++--- libbeat/outputs/kafka/config_test.go | 34 +++++++++++++++ libbeat/publisher/pipeline/controller_test.go | 3 +- libbeat/publisher/pipeline/output_test.go | 7 ++-- libbeat/publisher/pipeline/testing.go | 16 ------- metricbeat/metricbeat.reference.yml | 11 +++++ packetbeat/packetbeat.reference.yml | 11 +++++ winlogbeat/winlogbeat.reference.yml | 11 +++++ x-pack/auditbeat/auditbeat.reference.yml | 11 +++++ x-pack/filebeat/filebeat.reference.yml | 11 +++++ x-pack/metricbeat/metricbeat.reference.yml | 11 +++++ x-pack/winlogbeat/winlogbeat.reference.yml | 11 +++++ 20 files changed, 254 insertions(+), 25 deletions(-) create mode 100644 libbeat/internal/testutil/util.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bdeffbb4339..1649850b730 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -276,6 +276,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add keystore support for autodiscover static configurations. {pull]16306[16306] - Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153] - When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] +- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index d0a5ee0dae3..18108a79f33 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -734,6 +734,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index e34e85fd810..7e78ac238ae 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1447,6 +1447,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index cd8addcc09a..1082c4a9d05 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -884,6 +884,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 2bdf764d03b..a92ab5fa7b8 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -671,6 +671,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/libbeat/_meta/config/output-kafka.reference.yml.tmpl b/libbeat/_meta/config/output-kafka.reference.yml.tmpl index 50efb87fbb1..87b24c8d72e 100644 --- a/libbeat/_meta/config/output-kafka.reference.yml.tmpl +++ b/libbeat/_meta/config/output-kafka.reference.yml.tmpl @@ -70,6 +70,17 @@ # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/libbeat/common/backoff/backoff.go b/libbeat/common/backoff/backoff.go index a027fb51a46..5439e38cfbb 100644 --- a/libbeat/common/backoff/backoff.go +++ b/libbeat/common/backoff/backoff.go @@ -19,7 +19,10 @@ package backoff // Backoff defines the interface for backoff strategies. type Backoff interface { + // Wait blocks for a duration of time governed by the backoff strategy. Wait() bool + + // Reset resets the backoff duration to an initial value governed by the backoff strategy. Reset() } diff --git a/libbeat/internal/testutil/util.go b/libbeat/internal/testutil/util.go new file mode 100644 index 00000000000..51a811587e5 --- /dev/null +++ b/libbeat/internal/testutil/util.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file contains commonly-used utility functions for testing. + +package testutil + +import ( + "flag" + "math/rand" + "testing" + "time" +) + +var ( + SeedFlag = flag.Int64("seed", 0, "Randomization seed") +) + +func SeedPRNG(t *testing.T) { + seed := *SeedFlag + if seed == 0 { + seed = time.Now().UnixNano() + } + + t.Logf("reproduce test with `go test ... -seed %v`", seed) + rand.Seed(seed) +} diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index baf6867b4f7..0df045565aa 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -20,6 +20,8 @@ package kafka import ( "errors" "fmt" + "math" + "math/rand" "strings" "time" @@ -37,6 +39,11 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/codec" ) +type backoffConfig struct { + Init time.Duration `config:"init"` + Max time.Duration `config:"max"` +} + type kafkaConfig struct { Hosts []string `config:"hosts" validate:"required"` TLS *tlscommon.Config `config:"ssl"` @@ -55,6 +62,7 @@ type kafkaConfig struct { BulkMaxSize int `config:"bulk_max_size"` BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` Username string `config:"username"` @@ -106,10 +114,14 @@ func defaultConfig() kafkaConfig { CompressionLevel: 4, Version: kafka.Version("1.0.0"), MaxRetries: 3, - ClientID: "beats", - ChanBufferSize: 256, - Username: "", - Password: "", + Backoff: backoffConfig{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + ClientID: "beats", + ChanBufferSize: 256, + Username: "", + Password: "", } } @@ -225,7 +237,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err retryMax = 1000 } k.Producer.Retry.Max = retryMax - // TODO: k.Producer.Retry.Backoff = ? + k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff) // configure per broker go channel buffering k.ChannelBufferSize = config.ChanBufferSize @@ -260,3 +272,23 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err } return k, nil } + +// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually +// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff. +func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration { + maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init)))) + + return func(retries, _ int) time.Duration { + // compute 'base' duration for exponential backoff + dur := cfg.Max + if retries < maxBackoffRetries { + dur = time.Duration(uint64(cfg.Init) * uint64(1< Date: Tue, 5 May 2020 10:50:41 -0700 Subject: [PATCH 2/3] Fixing up CHANGELOG --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1649850b730..1d25388dbf2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -275,7 +275,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Set `agent.name` to the hostname by default. {issue}16377[16377] {pull}18000[18000] - Add keystore support for autodiscover static configurations. {pull]16306[16306] - Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153] -- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] *Auditbeat* From 8fba3b29d62fea3b67c58b21571e296650f6ae77 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 5 May 2020 10:57:05 -0700 Subject: [PATCH 3/3] Fixing up CHANGELOG --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1d25388dbf2..1649850b730 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -275,6 +275,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Set `agent.name` to the hostname by default. {issue}16377[16377] {pull}18000[18000] - Add keystore support for autodiscover static configurations. {pull]16306[16306] - Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153] +- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] *Auditbeat*