Skip to content

Commit

Permalink
Add "compression" option to otlphttp exporter
Browse files Browse the repository at this point in the history
All exporters that use HTTPClientSettings will now have a choice to have a
config option "compression". The config option is an opt-in and is only
active for exporters that set HTTPClientSettings.EnableCompression=true.
Right now this is only otlphttpexporter.

The default value of "compression" option is empty and results in no compression,
which is the same as the old behavior. You can also specify "compression: gzip"
which will result in gzip encoding of outgoing http requests.
  • Loading branch information
tigrannajaryan committed Feb 17, 2021
1 parent 846b971 commit 360d81f
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 11 deletions.
2 changes: 1 addition & 1 deletion config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ configuration. For more information, see [configtls
README](../configtls/README.md).

- [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md)
- `compression` (default = gzip): Compression type to use (only gzip is supported today)
- `compression` (default = none): Compression type to use (only gzip is supported today)
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- `headers`: name/value pairs added to the request
- [`keepalive`](https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters)
Expand Down
1 change: 1 addition & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ configuration. For more information, see [configtls
README](../configtls/README.md).

- `endpoint`: address:port
- `compression` (default = none): Compression type to use (only gzip is supported today)
- `headers`: name/value pairs added to the HTTP request headers
- [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- [`timeout`](https://golang.org/pkg/net/http/#Client)
Expand Down
24 changes: 24 additions & 0 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package confighttp

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"strings"
"time"

"github.com/rs/cors"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
)
Expand All @@ -46,6 +49,15 @@ type HTTPClientSettings struct {
// Existing header values are overwritten if collision happens.
Headers map[string]string `mapstructure:"headers,omitempty"`

// The compression key for supported compression types within
// collector. Currently the only supported mode is `gzip`.
Compression string `mapstructure:"compression"`

// EnableCompression must be set to true for Compression field to have effect.
// Set it to true in your CreateDefaultConfig if you want your exporter to
// have the "compression" setting supported.
EnableCompression bool `mapstructure:"-"`

// Custom Round Tripper to allow for individual components to intercept HTTP requests
CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error)
}
Expand Down Expand Up @@ -74,6 +86,18 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) {
}
}

if hcs.Compression != "" {
if !hcs.EnableCompression {
return nil, fmt.Errorf("unsupported option \"compression\"")
}

if strings.ToLower(hcs.Compression) == configgrpc.CompressionGzip {
clientTransport = middleware.NewCompressRoundTripper(clientTransport)
} else {
return nil, fmt.Errorf("unsupported compression type %q", hcs.Compression)
}
}

if hcs.CustomRoundTripper != nil {
clientTransport, err = hcs.CustomRoundTripper(clientTransport)
if err != nil {
Expand Down
34 changes: 31 additions & 3 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,32 @@ func TestAllHTTPClientSettings(t *testing.T) {
},
shouldError: true,
},
{
name: "error_compression_without_enabling",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
Compression: "gzip",
},
shouldError: true,
},
{
name: "compression_gzip",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
Compression: "gzip",
EnableCompression: true,
},
shouldError: false,
},
{
name: "compression_unknown",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
Compression: "unknown",
EnableCompression: true,
},
shouldError: true,
},
}

for _, test := range tests {
Expand All @@ -73,9 +99,11 @@ func TestAllHTTPClientSettings(t *testing.T) {
return
}
assert.NoError(t, err)
transport := client.Transport.(*http.Transport)
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
transport, ok := client.Transport.(*http.Transport)
if ok {
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
}
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ The following settings can be optionally configured:
- `key_file` path to the TLS key to use for TLS required connections. Should
only be used if `insecure` is set to false.

- `compression` (default = none): Compression type to use (only gzip is supported today)

- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.
Expand Down
1 change: 1 addition & 0 deletions exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
HTTPClientSettings: confighttp.HTTPClientSettings{
EnableCompression: true,
Headers: map[string]string{
"can you have a . here?": "F0000000-0000-0000-0000-000000000000",
"header1": "234",
Expand Down
7 changes: 4 additions & 3 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func createDefaultConfig() configmodels.Exporter {
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "",
Timeout: 30 * time.Second,
Headers: map[string]string{},
EnableCompression: true,
Endpoint: "",
Timeout: 30 * time.Second,
Headers: map[string]string{},
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
Expand Down
55 changes: 55 additions & 0 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) {
}
}

func TestCompressionOptions(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
baseURL string
compression string
err bool
}{
{
name: "no compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "",
},
{
name: "gzip",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip",
},
{
name: "incorrect compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip2",
err: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTraceReceiver(t, addr, sink)

factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
if test.err {
assert.Error(t, err)
return
}
require.NoError(t, err)
startAndCleanup(t, exp)

td := testdata.GenerateTraceDataOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpansCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}

func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

Expand Down
59 changes: 59 additions & 0 deletions internal/middleware/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,71 @@
package middleware

import (
"bytes"
"compress/gzip"
"compress/zlib"
"io"
"net/http"
)

const (
headerContentEncoding = "Content-Encoding"
headerValueGZIP = "gzip"
)

type CompressRoundTripper struct {
http.RoundTripper
}

func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper {
return &CompressRoundTripper{
RoundTripper: rt,
}
}

func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {

if req.Header.Get(headerContentEncoding) != "" {
// If the header already specifies a content encoding then skip compression
// since we don't want to compress it again. This is a safeguard that normally
// should not happen since CompressRoundTripper is not intended to be used
// with http clients which already do their own compression.
return r.RoundTripper.RoundTrip(req)
}

gzipWriter := gzip.NewWriter(nil)

// Gzip the body.
buf := bytes.NewBuffer([]byte{})
gzipWriter.Reset(buf)
_, copyErr := io.Copy(gzipWriter, req.Body)
closeErr := req.Body.Close()

if err := gzipWriter.Close(); err != nil {
return nil, err
}

if copyErr != nil {
return nil, copyErr
}
if closeErr != nil {
return nil, closeErr
}

// Create a new request since the docs say that we cannot modify the "req"
// (see https://golang.org/pkg/net/http/#RoundTripper).
cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf)
if err != nil {
return nil, err
}

// Clone the headers and add gzip encoding header.
cReq.Header = req.Header.Clone()
cReq.Header.Add(headerContentEncoding, headerValueGZIP)

return r.RoundTripper.RoundTrip(cReq)
}

type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)

type decompressor struct {
Expand Down
61 changes: 61 additions & 0 deletions internal/middleware/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,67 @@ import (
"go.opentelemetry.io/collector/testutil"
)

func TestHTTPClientCompression(t *testing.T) {
testBody := []byte("uncompressed_text")
compressedBody, _ := compressGzip(testBody)

tests := []struct {
name string
encoding string
reqBody []byte
}{
{
name: "NoCompression",
encoding: "",
reqBody: testBody,
},
{
name: "ValidGzip",
encoding: "gzip",
reqBody: compressedBody.Bytes(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, tt.reqBody, body)
w.WriteHeader(200)
})

addr := testutil.GetAvailableLocalAddress(t)
ln, err := net.Listen("tcp", addr)
require.NoError(t, err, "failed to create listener: %v", err)
srv := &http.Server{
Handler: handler,
}
go func() {
_ = srv.Serve(ln)
}()
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

serverURL := fmt.Sprintf("http://%s", ln.Addr().String())
reqBody := bytes.NewBuffer(testBody)

req, err := http.NewRequest("GET", serverURL, reqBody)
require.NoError(t, err, "failed to create request to test handler")

client := http.Client{}
if tt.encoding == "gzip" {
client.Transport = NewCompressRoundTripper(http.DefaultTransport)
}
res, err := client.Do(req)
require.NoError(t, err)

ioutil.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
require.NoError(t, srv.Close())
})
}
}

func TestHTTPContentDecompressionHandler(t *testing.T) {
testBody := []byte("uncompressed_text")
tests := []struct {
Expand Down
Loading

0 comments on commit 360d81f

Please sign in to comment.