Skip to content

Commit

Permalink
[Enhancement] add output kafka support for zstd (#40880)
Browse files Browse the repository at this point in the history
* add output kafka support for zstd

* add docs

* add docs

* revert go mod

* add more comments

* add kafka_integration_test

* modify test

* modify test

* modify test

* modify test

* add test

* fixes for linter

* change comment

* change version

* change version

---------

Co-authored-by: weiye.gong <weiye.gong@garena.com>
Co-authored-by: Lee E. Hinman <lee.e.hinman@elastic.co>
Co-authored-by: Pierre HILBERT <pierre.hilbert@elastic.co>
(cherry picked from commit 77e7d80)
  • Loading branch information
gwy1995 authored and mergify[bot] committed Oct 23, 2024
1 parent dadccd2 commit 9b111b9
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v8.15.2\...v8.15.3[View commits]
*Affecting all Beats*

- Update Go version to 1.22.8. {pull}41139[41139]
- Add kafka compression support for ZSTD.

*Metricbeat*

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- System module events now contain `input.type: systemlogs` instead of `input.type: log` when harvesting log files. {pull}41061[41061]

- Add kafka compression support for ZSTD.

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,18 @@ var compressionModes = map[string]sarama.CompressionCodec{
// As of sarama 1.24.1, zstd support is broken
// (https://github.com/Shopify/sarama/issues/1252), which needs to be
// addressed before we add support here.

// (https://github.com/IBM/sarama/pull/1574) sarama version 1.26.0 has
// fixed this issue and elastic version of sarama has merged this commit.
// (https://github.com/elastic/sarama/commit/37faed7ffc7d59e681d99cfebd1f3d453d6d607c)

"none": sarama.CompressionNone,
"no": sarama.CompressionNone,
"off": sarama.CompressionNone,
"gzip": sarama.CompressionGZIP,
"lz4": sarama.CompressionLZ4,
"snappy": sarama.CompressionSnappy,
"zstd": sarama.CompressionZSTD,
}

func defaultConfig() kafkaConfig {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ The keep-alive period for an active network connection. If 0s, keep-alives are d

===== `compression`

Sets the output compression codec. Must be one of `none`, `snappy`, `lz4` and `gzip`. The default is `gzip`.
Sets the output compression codec. Must be one of `none`, `snappy`, `lz4`, `gzip` and `zstd`. The default is `gzip`.

[IMPORTANT]
.Known issue with Azure Event Hub for Kafka
Expand Down
28 changes: 24 additions & 4 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ func TestKafkaPublish(t *testing.T) {
"host": "test-host",
}),
},
{
"publish message with zstd compression to test topic",
map[string]interface{}{
"compression": "zstd",
"version": "2.2",
},
testTopic,
single(mapstr.M{
"host": "test-host",
"message": id,
}),
},
}

defaultConfig := map[string]interface{}{
Expand All @@ -254,7 +266,10 @@ func TestKafkaPublish(t *testing.T) {

cfg := makeConfig(t, defaultConfig)
if test.config != nil {
cfg.Merge(makeConfig(t, test.config))
err := cfg.Merge(makeConfig(t, test.config))
if err != nil {
t.Fatal(err)
}
}

t.Run(name, func(t *testing.T) {
Expand All @@ -263,7 +278,8 @@ func TestKafkaPublish(t *testing.T) {
t.Fatal(err)
}

output := grp.Clients[0].(*client)
output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(); err != nil {
t.Fatal(err)
}
Expand All @@ -279,7 +295,10 @@ func TestKafkaPublish(t *testing.T) {
}

wg.Add(1)
output.Publish(context.Background(), batch)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
}

// wait for all published batches to be ACKed
Expand Down Expand Up @@ -335,7 +354,8 @@ func validateJSON(t *testing.T, value []byte, events []beat.Event) string {
return ""
}

msg := decoded["message"].(string)
msg, ok := decoded["message"].(string)
assert.True(t, ok, "type of decoded message was not string")
event := findEvent(events, msg)
if event == nil {
t.Errorf("could not find expected event with message: %v", msg)
Expand Down

0 comments on commit 9b111b9

Please sign in to comment.