Skip to content

Commit 1bba206

Browse files
herbxurdner
andauthored
[libbeat] kafka message header support (#29940)
Now it's possible to configure headers that will be attached to each message in the Kafka output. Co-authored-by: Denis Rechkunov <denis@rdner.de>
1 parent 06c810c commit 1bba206

File tree

6 files changed

+42
-1
lines changed

6 files changed

+42
-1
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
9191
- Add `script` processor to all beats {issue}29269[29269] {pull}29752[29752]
9292
- Only connect to Elasticsearch instances with the same version or newer. {pull}29683[29683]
9393
- Move umask from code to service files. {pull}29708[29708]
94+
- Add support for kafka message headers. {pull}29940[29940]
9495
- Add FIPS configuration option for all AWS API calls. {pull}[28899]
9596
- Add metadata change support for some processors {pull}30183[30183]
9697

libbeat/outputs/kafka/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type client struct {
5353

5454
producer sarama.AsyncProducer
5555

56+
recordHeaders []sarama.RecordHeader
57+
5658
wg sync.WaitGroup
5759
}
5860

@@ -76,6 +78,7 @@ func newKafkaClient(
7678
index string,
7779
key *fmtstr.EventFormatString,
7880
topic outil.Selector,
81+
headers map[string]string,
7982
writer codec.Codec,
8083
cfg *sarama.Config,
8184
) (*client, error) {
@@ -90,6 +93,20 @@ func newKafkaClient(
9093
config: *cfg,
9194
done: make(chan struct{}),
9295
}
96+
97+
if len(headers) != 0 {
98+
recordHeaders := make([]sarama.RecordHeader, 0, len(headers))
99+
for k, v := range headers {
100+
recordHeader := sarama.RecordHeader{
101+
Key: []byte(k),
102+
Value: []byte(v),
103+
}
104+
105+
recordHeaders = append(recordHeaders, recordHeader)
106+
}
107+
c.recordHeaders = recordHeaders
108+
}
109+
93110
return c, nil
94111
}
95112

libbeat/outputs/kafka/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type kafkaConfig struct {
6262
BulkMaxSize int `config:"bulk_max_size"`
6363
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
6464
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
65+
Headers map[string]string `config:"headers"`
6566
Backoff backoffConfig `config:"backoff"`
6667
ClientID string `config:"client_id"`
6768
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
@@ -125,6 +126,7 @@ func defaultConfig() kafkaConfig {
125126
CompressionLevel: 4,
126127
Version: kafka.Version("1.0.0"),
127128
MaxRetries: 3,
129+
Headers: nil,
128130
Backoff: backoffConfig{
129131
Init: 1 * time.Second,
130132
Max: 60 * time.Second,

libbeat/outputs/kafka/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func makeKafka(
7272
return outputs.Fail(err)
7373
}
7474

75-
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg)
75+
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg)
7676
if err != nil {
7777
return outputs.Fail(err)
7878
}

libbeat/outputs/kafka/kafka_integration_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,19 @@ func TestKafkaPublish(t *testing.T) {
215215
"message": id,
216216
}),
217217
},
218+
{
219+
"publish message with kafka headers to test topic",
220+
map[string]interface{}{
221+
"headers": map[string]string{
222+
"app": "test-app",
223+
"host": "test-host",
224+
},
225+
},
226+
testTopic,
227+
randMulti(5, 100, common.MapStr{
228+
"host": "test-host",
229+
}),
230+
},
218231
}
219232

220233
defaultConfig := map[string]interface{}{
@@ -279,6 +292,10 @@ func TestKafkaPublish(t *testing.T) {
279292

280293
seenMsgs := map[string]struct{}{}
281294
for _, s := range stored {
295+
if headers, exists := test.config["headers"]; exists {
296+
assert.Equal(t, len(headers.(map[string]string)), len(s.Headers))
297+
}
298+
282299
msg := validate(t, s.Value, expected)
283300
seenMsgs[msg] = struct{}{}
284301
}

libbeat/outputs/kafka/message.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ func (m *message) initProducerMessage() {
5050
Value: sarama.ByteEncoder(m.value),
5151
Timestamp: m.ts,
5252
}
53+
54+
if m.ref != nil {
55+
m.msg.Headers = m.ref.client.recordHeaders
56+
}
5357
}

0 commit comments

Comments
 (0)