Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Go Benchmarks comparing compress/gzip and klauspost/compress #41584

Merged
merged 15 commits into from
Nov 18, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled
- Add `lowercase` processor. {issue}22254[22254] {pull}41424[41424]
- Add `uppercase` processor. {issue}22254[22254] {pull}41535[41535]
- Replace `compress/gzip` with https://github.com/klauspost/compress/gzip library for gzip compression {pull}41584[41584]

*Auditbeat*

Expand Down
2,778 changes: 1,389 additions & 1,389 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/icholy/digest v0.1.22
github.com/klauspost/compress v1.17.9
github.com/meraki/dashboard-api-go/v3 v3.0.9
github.com/otiai10/copy v1.12.0
github.com/pierrec/lz4/v4 v4.1.18
Expand Down Expand Up @@ -334,7 +335,6 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kortschak/utter v1.5.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion libbeat/esleg/eslegclient/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package eslegclient

import (
"bytes"
"compress/gzip"

"io"
"net/http"
"time"

"github.com/klauspost/compress/gzip"
rdner marked this conversation as resolved.
Show resolved Hide resolved

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down
33 changes: 32 additions & 1 deletion libbeat/internal/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ package testutil

import (
"flag"
"fmt"
"math/rand"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var (
Expand All @@ -37,5 +41,32 @@ func SeedPRNG(t *testing.T) {
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
rand.New(rand.NewSource(seed))
}

func GenerateEvents(numEvents, fieldsPerLevel, depth int) []beat.Event {
events := make([]beat.Event, numEvents)
for i := 0; i < numEvents; i++ {
event := &beat.Event{Fields: mapstr.M{}}
generateFields(event, fieldsPerLevel, depth)
events[i] = *event
}
return events
}

func generateFields(event *beat.Event, fieldsPerLevel, depth int) {
if depth == 0 {
return
}

for j := 1; j <= fieldsPerLevel; j++ {
var key string
for d := 1; d <= depth; d++ {
key += fmt.Sprintf("level%dfield%d", d, j)
key += "."
}
event.Fields.Put(key, "value")
key = ""
}

}
78 changes: 78 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
Expand Down Expand Up @@ -713,6 +714,83 @@ func BenchmarkCollectPublishFailAll(b *testing.B) {
}
}

func BenchmarkPublish(b *testing.B) {
tests := []struct {
Name string
Events []beat.Event
}{
{
Name: "5 events",
Events: testutil.GenerateEvents(50, 5, 3),
},
{
Name: "50 events",
Events: testutil.GenerateEvents(500, 5, 3),
},
{
Name: "500 events",
Events: testutil.GenerateEvents(500, 5, 3),
},
}

levels := []int{1, 4, 7, 9}

requestCount := 0

// start a mock HTTP server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(b, "testing value", r.Header.Get("X-Test"))
// from the documentation: https://golang.org/pkg/net/http/
// For incoming requests, the Host header is promoted to the
// Request.Host field and removed from the Header map.
assert.Equal(b, "myhost.local", r.Host)

var response string
if r.URL.Path == "/" {
response = `{ "version": { "number": "7.6.0" } }`
} else {
response = `{"items":[{"index":{}},{"index":{}},{"index":{}}]}`

}
fmt.Fprintln(w, response)
requestCount++
}))
defer ts.Close()
mauri870 marked this conversation as resolved.
Show resolved Hide resolved

// Indexing to _bulk api
for _, test := range tests {
for _, l := range levels {
b.Run(fmt.Sprintf("%s with compression level %d", test.Name, l), func(b *testing.B) {
client, err := NewClient(
clientSettings{
connection: eslegclient.ConnectionSettings{
URL: ts.URL,
Headers: map[string]string{
"host": "myhost.local",
"X-Test": "testing value",
},
CompressionLevel: l,
},
},

nil,
)
assert.NoError(b, err)
batch := encodeBatch(client, outest.NewBatch(test.Events...))

// It uses gzip encoder internally for encoding data
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.Publish(context.Background(), batch)
assert.NoError(b, err)
}
})

}
}

}

func TestClientWithHeaders(t *testing.T) {
requestCount := 0
// start a mock HTTP server
Expand Down
51 changes: 6 additions & 45 deletions libbeat/processors/actions/lowercase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package actions

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -337,21 +337,14 @@ func BenchmarkLowerCaseProcessorRun(b *testing.B) {
Events []beat.Event
}{
{
Name: "5000 events with 5 fields on each level with 3 level depth without collisions",
Events: GenerateEvents(5000, 5, 3, false),
Name: "5000 events with 5 fields on each level with 3 level depth",
Events: testutil.GenerateEvents(5000, 5, 3),
},
{
Name: "5000 events with 5 fields on each level with 3 level depth with collisions",
Events: GenerateEvents(5000, 5, 3, true),
},
{
Name: "500 events with 50 fields on each level with 5 level depth without collisions",
Events: GenerateEvents(500, 50, 3, false),
},
{
Name: "500 events with 50 fields on each level with 5 level depth with collisions",
Events: GenerateEvents(500, 50, 3, true),
Name: "500 events with 50 fields on each level with 5 level depth",
Events: testutil.GenerateEvents(500, 50, 3),
},

// Add more test cases as needed for benchmarking
}

Expand All @@ -376,35 +369,3 @@ func BenchmarkLowerCaseProcessorRun(b *testing.B) {
})
}
}

func GenerateEvents(numEvents, fieldsPerLevel, depth int, withCollisions bool) []beat.Event {
events := make([]beat.Event, numEvents)
for i := 0; i < numEvents; i++ {
event := &beat.Event{Fields: mapstr.M{}}
generateFields(event, fieldsPerLevel, depth, withCollisions)
events[i] = *event
}
return events
}

func generateFields(event *beat.Event, fieldsPerLevel, depth int, withCollisions bool) {
if depth == 0 {
return
}

for j := 1; j <= fieldsPerLevel; j++ {
var key string
for d := 1; d < depth; d++ {
key += fmt.Sprintf("level%dfield%d", d, j)
key += "."
}
if withCollisions {
key += fmt.Sprintf("Level%dField%d", depth, j) // Creating a collision (Level is capitalized)
} else {
key += fmt.Sprintf("level%dfield%d", depth, j)
}
event.Fields.Put(key, "value")
key = ""
}

}
Loading