Skip to content

Commit

Permalink
Add kafka topic selection tests
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jun 9, 2020
1 parent 4691667 commit b93f27c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 7 deletions.
62 changes: 62 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -131,3 +132,64 @@ func TestBackoffFunc(t *testing.T) {
})
}
}

func TestTopicSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"topic configured": {
cfg: map[string]interface{}{"topic": "test"},
want: "test",
},
"topic must keep case": {
cfg: map[string]interface{}{"topic": "Test"},
want: "Test",
},
"topics setting": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "test"}},
},
want: "test",
},
"topics setting must keep case": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "Test"}},
},
want: "Test",
},
"use event field": {
cfg: map[string]interface{}{"topic": "test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "from-event"},
},
want: "test-from-event",
},
"use event field must keep case": {
cfg: map[string]interface{}{"topic": "Test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "From-Event"},
},
want: "Test-From-Event",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := selector.Select(&test.event)
if err != nil {
t.Fatalf("Failed to create topic name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
18 changes: 11 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,7 @@ func makeKafka(
return outputs.Fail(err)
}

topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
topic, err := buildTopicSelector(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -103,3 +97,13 @@ func makeKafka(
}
return outputs.Success(config.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
}

0 comments on commit b93f27c

Please sign in to comment.