Skip to content

Commit

Permalink
[exporter/elasticsearch] Enable gzip compression by default (open-tel…
Browse files Browse the repository at this point in the history
…emetry#35865)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Enable gzip compression by default, at hardcoded level BestSpeed. To
disable compression, set `compression` to `none`.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
carsonip authored Oct 21, 2024
1 parent 33a5457 commit 1ef15b9
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_compression-gzip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable gzip compression by default

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35865]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: To disable compression, set config `compression` to `none`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ service:
### HTTP settings
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp], except for `compression` (all requests are uncompressed).
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp]. Gzip compression is enabled by default. To disable compression, set `compression` to `none`.
As a consequence of supporting [confighttp], the Elasticsearch exporter also supports common [TLS Configuration Settings][configtls].

The Elasticsearch exporter sets `timeout` (HTTP request timeout) to 90s by default.
Expand Down
7 changes: 7 additions & 0 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"compress/gzip"
"context"
"errors"
"io"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-elasticsearch/v7"
"go.opentelemetry.io/collector/config/configcompression"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,12 +70,17 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
maxDocRetries = config.Retry.MaxRetries
}
}
var compressionLevel int
if config.Compression == configcompression.TypeGzip {
compressionLevel = gzip.BestSpeed
}
return docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
RequireDataStream: config.MappingMode() == MappingOTel,
CompressionLevel: compressionLevel,
}
}

Expand Down
94 changes: 80 additions & 14 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/go-elasticsearch/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/confighttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -62,13 +63,8 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)
bulkIndexer := runBulkIndexerOnce(t, &cfg, client)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
}

Expand Down Expand Up @@ -157,13 +153,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
runBulkIndexerOnce(t, &tt.config, client)

assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
})
Expand Down Expand Up @@ -234,14 +224,15 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {

bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
require.NoError(t, err)
defer bulkIndexer.Close(context.Background())

session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
messages := observed.FilterMessage(tt.wantMessage)
require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All())
for _, wantField := range tt.wantFields {
Expand All @@ -250,3 +241,78 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
})
}
}

func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
tests := []struct {
name string
config Config
}{
{
name: "compression none",
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "none"},
},
},
{
name: "compression gzip",
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "gzip"},
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

loggerCore, logObserver := observer.New(zap.DebugLevel)

esLogger := clientLogger{
Logger: zap.New(loggerCore),
logRequestBody: true,
logResponseBody: true,
}

client, err := elasticsearch.NewClient(elasticsearch.Config{
Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
},
Logger: &esLogger,
})
require.NoError(t, err)

runBulkIndexerOnce(t, &tt.config, client)

records := logObserver.AllUntimed()
assert.Len(t, records, 2)

assert.Equal(t, "/", records[0].ContextMap()["path"])
assert.Nil(t, records[0].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string))

assert.Equal(t, "/_bulk", records[1].ContextMap()["path"])
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string))
})
}
}

func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
}
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
Expand Down Expand Up @@ -273,9 +274,8 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
}

if cfg.Compression != "" {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip {
return errors.New("compression must be one of [none, gzip]")
}

if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {
Expand Down
30 changes: 27 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elasticsearchexporter
import (
"net/http"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -38,6 +39,7 @@ func TestConfig(t *testing.T) {

defaultMaxIdleConns := 100
defaultIdleConnTimeout := 90 * time.Second
defaultCompression := configcompression.TypeGzip

tests := []struct {
configFile string
Expand Down Expand Up @@ -80,6 +82,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -150,6 +153,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -220,6 +224,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -301,10 +306,29 @@ func TestConfig(t *testing.T) {
cfg.Batcher.Enabled = &enabled
}),
},
{
id: component.NewIDWithName(metadata.Type, "compression_none"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Compression = "none"
}),
},
{
id: component.NewIDWithName(metadata.Type, "compression_gzip"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Compression = "gzip"
}),
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
tt := tt
t.Run(strings.ReplaceAll(tt.id.String(), "/", "_"), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

Expand Down Expand Up @@ -387,9 +411,9 @@ func TestConfig_Validate(t *testing.T) {
"compression unsupported": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Compression = configcompression.TypeGzip
cfg.Compression = configcompression.TypeSnappy
}),
err: `compression is not currently configurable`,
err: `compression must be one of [none, gzip]`,
},
"both max_retries and max_requests specified": {
config: withDefaultConfig(func(cfg *Config) {
Expand Down
10 changes: 9 additions & 1 deletion exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v7"
"github.com/klauspost/compress/gzip"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

Expand All @@ -32,7 +33,14 @@ func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, cl

var fields []zap.Field
if cl.logRequestBody && requ != nil && requ.Body != nil {
if b, err := io.ReadAll(requ.Body); err == nil {
body := requ.Body
if requ.Header.Get("Content-Encoding") == "gzip" {
if r, err := gzip.NewReader(body); err == nil {
defer r.Close()
body = r
}
}
if b, err := io.ReadAll(body); err == nil {
fields = append(fields, zap.ByteString("request_body", b))
}
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config {

httpClientConfig := confighttp.NewDefaultClientConfig()
httpClientConfig.Timeout = 90 * time.Second
httpClientConfig.Compression = configcompression.TypeGzip

return &Config{
QueueSettings: qs,
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-structform v0.0.12
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.10
github.com/lestrrat-go/strftime v1.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
Expand Down Expand Up @@ -44,7 +45,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ elasticsearch/batcher_disabled:
endpoint: https://elastic.example.com:9200
batcher:
enabled: false
elasticsearch/compression_none:
endpoint: https://elastic.example.com:9200
compression: none
elasticsearch/compression_gzip:
endpoint: https://elastic.example.com:9200
compression: gzip
7 changes: 6 additions & 1 deletion exporter/elasticsearchexporter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -160,7 +161,11 @@ func newESTestServer(t *testing.T, bulkHandler bulkHandler) *httptest.Server {
tsStart := time.Now()
var items []itemRequest

dec := json.NewDecoder(req.Body)
body := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
body, _ = gzip.NewReader(req.Body)
}
dec := json.NewDecoder(body)
for dec.More() {
var action, doc json.RawMessage
if err := dec.Decode(&action); err != nil {
Expand Down

0 comments on commit 1ef15b9

Please sign in to comment.