Skip to content

Commit

Permalink
Merge pull request elastic#7819 from urso/backport_7781_6.4
Browse files Browse the repository at this point in the history
Cherry-pick elastic#7781 to 6.4: Add backoff support to the redis output
  • Loading branch information
andrewvc authored Jul 31, 2018
2 parents a70065b + 689f233 commit bc7bd2a
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff]
*Affecting all Beats*

- Fixed `add_host_metadata` not initializing correctly on Windows. {issue}7715[7715]
- Add backoff on error support to redis output. {pull}7781[7781]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
13 changes: 13 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,19 @@ to another host if the currently selected one becomes unreachable. The default v

The Redis connection timeout in seconds. The default is 5 seconds.

===== `backoff.init`

The number of seconds to wait before trying to reconnect to Redis after
a network error. After waiting `backoff.init` seconds, {beatname_uc} tries to
reconnect. If the attempt fails, the backoff timer is increased exponentially up
to `backoff.max`. After a successful connection, the backoff timer is reset. The
default is 1s.

===== `backoff.max`

The maximum number of seconds to wait before attempting to connect to
Redis after a network error. The default is 60s.

===== `max_retries`

ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="winlogbeat")]
Expand Down
114 changes: 114 additions & 0 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package redis

import (
"time"

"github.com/garyburd/redigo/redis"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
)

type backoffClient struct {
client *client

reason failReason

done chan struct{}
backoff *common.Backoff
}

// failReason is used to track the cause of an error.
// The redis client forces us to reconnect on any error (even for redis
// internal errors). The backoff timer must not be reset on a successful
// reconnect after publishing failed with a redis internal
// error (e.g. OutOfMemory), so we can still guarantee the backoff duration
// increases exponentially.
type failReason uint8

const (
failNone failReason = iota
failRedis
failOther
)

func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
done := make(chan struct{})
backoff := common.NewBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
backoff: backoff,
}
}

func (b *backoffClient) Connect() error {
err := b.client.Connect()
if err != nil {
// give the client a chance to promote an internal error to a network error.
b.updateFailReason(err)
b.backoff.Wait()
} else if b.reason != failRedis { // Only reset backoff duration if failure was due to IO errors.
b.resetFail()
}

return err
}

func (b *backoffClient) Close() error {
err := b.client.Close()
close(b.done)
return err
}

func (b *backoffClient) Publish(batch publisher.Batch) error {
err := b.client.Publish(batch)
if err != nil {
b.client.Close()
b.updateFailReason(err)
b.backoff.Wait()
} else {
b.resetFail()
}
return err
}

func (b *backoffClient) updateFailReason(err error) {
if b.reason == failRedis {
// we only allow 'Publish' to recover from an redis internal error
return
}

if err == nil {
b.reason = failNone
return
}

if _, ok := err.(redis.Error); ok {
b.reason = failRedis
} else {
b.reason = failOther
}
}

func (b *backoffClient) resetFail() {
b.reason = failNone
b.backoff.Reset()
}
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {
// RPUSH returns total length of list -> fail and retry all on error
_, err := conn.Do(command, args...)
if err != nil {
logp.Err("Failed to %v to redis list with %v", command, err)
logp.Err("Failed to %v to redis list with: %v", command, err)
return okEvents, err

}
Expand Down
10 changes: 10 additions & 0 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type redisConfig struct {
Codec codec.Config `config:"codec"`
Db int `config:"db"`
DataType string `config:"datatype"`
Backoff backoff `config:"backoff"`
}

type backoff struct {
Init time.Duration
Max time.Duration
}

var (
Expand All @@ -52,6 +58,10 @@ var (
TLS: nil,
Db: 0,
DataType: "list",
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
)

Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ func makeRedis(
return outputs.Fail(err)
}

clients[i] = newClient(conn, observer, config.Timeout,
client := newClient(conn, observer, config.Timeout,
config.Password, config.Db, key, dataType, config.Index, enc)
clients[i] = newBackoffClient(client, config.Backoff.Init, config.Backoff.Max)
}

return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/redis/redis_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func getSRedisAddr() string {
getEnv("SREDIS_PORT", SRedisDefaultPort))
}

func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *client {
func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Client {
config, err := common.NewConfigFrom(cfg)
if err != nil {
t.Fatalf("Error reading config: %v", err)
Expand All @@ -296,15 +296,15 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *client {
t.Fatalf("Failed to initialize redis output: %v", err)
}

client := out.Clients[0].(*client)
client := out.Clients[0].(outputs.NetworkClient)
if err := client.Connect(); err != nil {
t.Fatalf("Failed to connect to redis host: %v", err)
}

return client
}

func sendTestEvents(out *client, batches, N int) error {
func sendTestEvents(out outputs.Client, batches, N int) error {
i := 1
for b := 0; b < batches; b++ {
events := make([]beat.Event, N)
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to reconnect to Redis
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Redis after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Redis request or pipeline.
# The default is 2048.
#bulk_max_size: 2048
Expand Down

0 comments on commit bc7bd2a

Please sign in to comment.