diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 3775cafd47b8..816cb8f03e87 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/logp" @@ -131,3 +132,64 @@ func TestBackoffFunc(t *testing.T) { }) } } + +func TestTopicSelection(t *testing.T) { + cases := map[string]struct { + cfg map[string]interface{} + event beat.Event + want string + }{ + "topic configured": { + cfg: map[string]interface{}{"topic": "test"}, + want: "test", + }, + "topic must keep case": { + cfg: map[string]interface{}{"topic": "Test"}, + want: "Test", + }, + "topics setting": { + cfg: map[string]interface{}{ + "topics": []map[string]interface{}{{"topic": "test"}}, + }, + want: "test", + }, + "topics setting must keep case": { + cfg: map[string]interface{}{ + "topics": []map[string]interface{}{{"topic": "Test"}}, + }, + want: "Test", + }, + "use event field": { + cfg: map[string]interface{}{"topic": "test-%{[field]}"}, + event: beat.Event{ + Fields: common.MapStr{"field": "from-event"}, + }, + want: "test-from-event", + }, + "use event field must keep case": { + cfg: map[string]interface{}{"topic": "Test-%{[field]}"}, + event: beat.Event{ + Fields: common.MapStr{"field": "From-Event"}, + }, + want: "Test-From-Event", + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg)) + if err != nil { + t.Fatalf("Failed to parse configuration: %v", err) + } + + got, err := selector.Select(&test.event) + if err != nil { + t.Fatalf("Failed to create topic name: %v", err) + } + + if test.want != got { + t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got) + } + }) + } +} diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 25b6ba53bb9b..9be3970b1c41 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -66,13 +66,7 @@ func makeKafka( return outputs.Fail(err) } - topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ - Key: "topic", - MultiKey: "topics", - EnableSingleOnly: true, - FailEmpty: true, - Case: outil.SelectorKeepCase, - }) + topic, err := buildTopicSelector(cfg) if err != nil { return outputs.Fail(err) } @@ -103,3 +97,13 @@ func makeKafka( } return outputs.Success(config.BulkMaxSize, retry, client) } + +func buildTopicSelector(cfg *common.Config) (outil.Selector, error) { + return outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "topic", + MultiKey: "topics", + EnableSingleOnly: true, + FailEmpty: true, + Case: outil.SelectorKeepCase, + }) +}