Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renamed redis.index option to redis.key #2077

Merged
merged 1 commit into from
Jul 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d
==== Breaking changes

*Affecting all Beats*

- Rename the `filters` section to `processors`. {pull}1944[1944]
- Introduce the condition with `when` in the processor configuration. {pull}1949[1949]
- The Elasticsearch template is now loaded by default. {pull}1993[1993]
- The Redis output `index` setting is renamed to `key`. `index` still works but it's deprecated. {pull}2077[2077]
- The undocumented file output `index` setting was removed. Use `filename` instead. {pull}2077[2077]

*Metricbeat*

Expand Down
8 changes: 4 additions & 4 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ filebeat.prospectors:

#================================ Processors =====================================

# Processors are used to reduce the number of fields in the exported event or to
# enhance the event with external meta data. This section defines a list of processors
# Processors are used to reduce the number of fields in the exported event or to
# enhance the event with external meta data. This section defines a list of processors
# that are applied one by one and the first one receives the initial event:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
Expand Down Expand Up @@ -380,7 +380,7 @@ output.elasticsearch:
#template.overwrite: false

# If set to true, filebeat checks the Elasticsearch version at connect time, and if it
# is 2.x, it loads the file specified by the template.versions.2x.path setting. The
# is 2.x, it loads the file specified by the template.versions.2x.path setting. The
# default is true.
#template.versions.2x.enabled: true

Expand Down Expand Up @@ -577,7 +577,7 @@ output.elasticsearch:

# The name of the Redis list or channel the events are published to. The
# default is filebeat.
#index: filebeat
#key: filebeat

# The password to authenticate with. The default is no authentication.
#password:
Expand Down
2 changes: 1 addition & 1 deletion libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ output.elasticsearch:

# The name of the Redis list or channel the events are published to. The
# default is beatname.
#index: beatname
#key: beatname

# The password to authenticate with. The default is no authentication.
#password:
Expand Down
11 changes: 9 additions & 2 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ output.redis:
# password is set.
password: "my_password"

# Optional index name. The default is {beatname_lc} and generates {beatname_lc} keys.
index: "{beatname_lc}"
# Optional key name. The default is {beatname_lc}
key: "{beatname_lc}"

# Optional Redis database number where the events are stored
# The default is 0.
Expand Down Expand Up @@ -668,6 +668,13 @@ The Redis port to use if `hosts` does not contain a port number. The default is

===== index

deprecated[5.0.0-alpha5,The `index` setting is renamed to `key.]

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

===== key

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &elasticsearchOutput{beatName: beatName}
err := output.init(cfg, topologyExpire)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

type config struct {
Index string `config:"index"`
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb int `config:"rotate_every_kb" validate:"min=1"`
Expand All @@ -22,10 +21,6 @@ var (
)

func (c *config) Validate() error {
if c.Filename == "" && c.Index == "" {
return fmt.Errorf("File logging requires filename or index being set.")
}

if c.NumberOfFiles < 2 || c.NumberOfFiles > logp.RotatorMaxFiles {
return fmt.Errorf("The number_of_files to keep should be between 2 and %v",
logp.RotatorMaxFiles)
Expand Down
5 changes: 5 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func init() {
}

func new(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &logstash{}
if err := output.init(cfg); err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ func InitOutputs(
continue
}

if !config.HasField("index") {
config.SetString("index", -1, beatName)
}

output, err := plugin(beatName, config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
Expand Down
12 changes: 10 additions & 2 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/transport"
)

type redisConfig struct {
Password string `config:"password"`
Index string `config:"index"`
Key string `config:"key"`
Port int `config:"port"`
LoadBalance bool `config:"loadbalance"`
Timeout time.Duration `config:"timeout"`
Expand Down Expand Up @@ -49,8 +51,14 @@ func (c *redisConfig) Validate() error {
return fmt.Errorf("redis data type %v not supported", c.DataType)
}

if c.Index == "" {
return errors.New("index required")
if c.Key != "" && c.Index != "" {
return errors.New("Cannot use both `output.redis.key` and `output.redis.index` configuration options." +
" Set only `output.redis.key`")
}

if c.Key == "" && c.Index != "" {
c.Key = c.Index
logp.Warn("The `output.redis.index` configuration setting is deprecated. Use `output.redis.key` instead.")
}

return nil
Expand Down
30 changes: 30 additions & 0 deletions libbeat/outputs/redis/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
type io struct {
Name string
Input redisConfig
Valid bool
}

tests := []io{
io{"No config", redisConfig{Key: "", Index: ""}, true},
io{"Only key", redisConfig{Key: "test", Index: ""}, true},
io{"Only index", redisConfig{Key: "", Index: "test"}, true},
io{"Both", redisConfig{Key: "test", Index: "test"}, false},

io{"Invalid Datatype", redisConfig{Key: "test", DataType: "something"}, false},
io{"List Datatype", redisConfig{Key: "test", DataType: "list"}, true},
io{"Channel Datatype", redisConfig{Key: "test", DataType: "channel"}, true},
}

for _, test := range tests {
assert.Equal(t, test.Input.Validate() == nil, test.Valid, test.Name)
}
}
12 changes: 6 additions & 6 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"errors"
"expvar"
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -18,6 +17,7 @@ import (
type redisOut struct {
mode mode.ConnectionMode
topology
beatName string
}

var debugf = logp.MakeDebug("redis")
Expand All @@ -40,7 +40,7 @@ func init() {
}

func new(beatName string, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
r := &redisOut{}
r := &redisOut{beatName: beatName}
if err := r.init(cfg, expireTopo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,9 +69,9 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
return errors.New("Bad Redis data type")
}

index := []byte(config.Index)
if len(index) == 0 {
return fmt.Errorf("missing %v", cfg.PathOf("index"))
key := []byte(config.Key)
if len(key) == 0 {
key = []byte(r.beatName)
}

tls, err := outputs.LoadTLSConfig(config.TLS)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
if err != nil {
return nil, err
}
return newClient(t, config.Password, config.Db, index, dataType), nil
return newClient(t, config.Password, config.Db, key, dataType), nil
})
if err != nil {
return err
Expand Down
42 changes: 21 additions & 21 deletions libbeat/outputs/redis/redis_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ const (

func TestTopologyInRedisTCP(t *testing.T) {
db := 1
index := "test_topo_tcp"
key := "test_topo_tcp"
redisHosts := []string{getRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
Expand All @@ -40,11 +40,11 @@ func TestTopologyInRedisTCP(t *testing.T) {

func TestTopologyInRedisTLS(t *testing.T) {
db := 1
index := "test_topo_tls"
key := "test_topo_tls"
redisHosts := []string{getSRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
Expand All @@ -70,7 +70,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db_topology"]; ok {
db = v.(int)
}
Expand All @@ -83,7 +83,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}
// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)
}

// 1. connect
Expand Down Expand Up @@ -116,11 +116,11 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

func TestPublishListTCP(t *testing.T) {
index := "test_publist_tcp"
key := "test_publist_tcp"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
Expand All @@ -130,11 +130,11 @@ func TestPublishListTCP(t *testing.T) {
}

func TestPublishListTLS(t *testing.T) {
index := "test_publist_tls"
key := "test_publist_tls"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
Expand All @@ -154,7 +154,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
Expand All @@ -166,15 +166,15 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

out := newRedisTestingOutput(t, cfg)
err = sendTestEvents(out, batches, batchSize)
assert.NoError(t, err)

results := make([][]byte, total)
for i := range results {
results[i], err = redis.Bytes(conn.Do("LPOP", index))
results[i], err = redis.Bytes(conn.Do("LPOP", key))
assert.NoError(t, err)
}

Expand All @@ -188,10 +188,10 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

func TestPublishChannelTCP(t *testing.T) {
db := 0
index := "test_pubchan_tcp"
key := "test_pubchan_tcp"
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
Expand All @@ -202,10 +202,10 @@ func TestPublishChannelTCP(t *testing.T) {

func TestPublishChannelTLS(t *testing.T) {
db := 0
index := "test_pubchan_tls"
key := "test_pubchan_tls"
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
Expand All @@ -225,7 +225,7 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
Expand All @@ -237,14 +237,14 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

// subscribe to packetbeat channel
psc := redis.PubSubConn{conn}
if err := psc.Subscribe(index); err != nil {
if err := psc.Subscribe(key); err != nil {
t.Fatal(err)
}
defer psc.Unsubscribe(index)
defer psc.Unsubscribe(key)

// connect and publish events
var wg sync.WaitGroup
Expand Down
Loading