diff --git a/config/configgrpc/README.md b/config/configgrpc/README.md index ef5fe4c0f4b..d66408d75d2 100644 --- a/config/configgrpc/README.md +++ b/config/configgrpc/README.md @@ -16,7 +16,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md) -- `compression` (default = gzip): Compression type to use (only gzip is supported today) +- `compression` (default = none): Compression type to use (only gzip is supported today) - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `headers`: name/value pairs added to the request - [`keepalive`](https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 8e1c8b5cf75..8f6e05f4986 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -15,6 +15,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - `endpoint`: address:port +- `compression` (default = none): Compression type to use (only gzip is supported today) - `headers`: name/value pairs added to the HTTP request headers - [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport) - [`timeout`](https://golang.org/pkg/net/http/#Client) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 56936c74da8..0f16cc38ff8 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -16,12 +16,15 @@ package confighttp import ( "crypto/tls" + "fmt" "net" "net/http" + "strings" "time" "github.com/rs/cors" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/internal/middleware" ) @@ -46,6 +49,15 @@ type HTTPClientSettings struct { // Existing header values are overwritten if collision happens. Headers map[string]string `mapstructure:"headers,omitempty"` + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` + + // EnableCompression must be set to true for Compression field to have effect. + // Set it to true in your CreateDefaultConfig if you want your exporter to + // have the "compression" setting supported. + EnableCompression bool `mapstructure:"-"` + // Custom Round Tripper to allow for individual components to intercept HTTP requests CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) } @@ -74,6 +86,18 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) { } } + if hcs.Compression != "" { + if !hcs.EnableCompression { + return nil, fmt.Errorf("unsupported option \"compression\"") + } + + if strings.ToLower(hcs.Compression) == configgrpc.CompressionGzip { + clientTransport = middleware.NewCompressRoundTripper(clientTransport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", hcs.Compression) + } + } + if hcs.CustomRoundTripper != nil { clientTransport, err = hcs.CustomRoundTripper(clientTransport) if err != nil { diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index 66d98e4925a..26e7e93dc27 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -63,6 +63,32 @@ func TestAllHTTPClientSettings(t *testing.T) { }, shouldError: true, }, + { + name: "error_compression_without_enabling", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "gzip", + }, + shouldError: true, + }, + { + name: "compression_gzip", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "gzip", + EnableCompression: true, + }, + shouldError: false, + }, + { + name: "compression_unknown", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "unknown", + EnableCompression: true, + }, + shouldError: true, + }, } for _, test := range tests { @@ -73,9 +99,11 @@ func TestAllHTTPClientSettings(t *testing.T) { return } assert.NoError(t, err) - transport := client.Transport.(*http.Transport) - assert.EqualValues(t, 1024, transport.ReadBufferSize) - assert.EqualValues(t, 512, transport.WriteBufferSize) + transport, ok := client.Transport.(*http.Transport) + if ok { + assert.EqualValues(t, 1024, transport.ReadBufferSize) + assert.EqualValues(t, 512, transport.WriteBufferSize) + } }) } } diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107b..7dd10b47f07 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index a0d3aa83a3b..c9b8fea19d1 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -63,6 +63,7 @@ func TestLoadConfig(t *testing.T) { QueueSize: 10, }, HTTPClientSettings: confighttp.HTTPClientSettings{ + EnableCompression: true, Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", "header1": "234", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 592f0e016ab..4eb1eee141d 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -50,9 +50,10 @@ func createDefaultConfig() configmodels.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), HTTPClientSettings: confighttp.HTTPClientSettings{ - Endpoint: "", - Timeout: 30 * time.Second, - Headers: map[string]string{}, + EnableCompression: true, + Endpoint: "", + Timeout: 30 * time.Second, + Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. WriteBufferSize: 512 * 1024, }, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index f9bc3ce98df..bf1fda8f743 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) { } } +func TestCompressionOptions(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + + tests := []struct { + name string + baseURL string + compression string + err bool + }{ + { + name: "no compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "", + }, + { + name: "gzip", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip", + }, + { + name: "incorrect compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip2", + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + startTraceReceiver(t, addr, sink) + + factory := NewFactory() + cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) + cfg.Compression = test.compression + exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + if test.err { + assert.Error(t, err) + return + } + require.NoError(t, err) + startAndCleanup(t, exp) + + td := testdata.GenerateTraceDataOneSpan() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + require.Eventually(t, func() bool { + return sink.SpansCount() > 0 + }, 1*time.Second, 10*time.Millisecond) + allTraces := sink.AllTraces() + require.Len(t, allTraces, 1) + assert.EqualValues(t, td, allTraces[0]) + }) + } +} + func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953..7e076d637fa 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,71 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + + if req.Header.Get(headerContentEncoding) != "" { + // If the header already specifies a content encoding then skip compression + // since we don't want to compress it again. This is a safeguard that normally + // should not happen since CompressRoundTripper is not intended to be used + // with http clients which already do their own compression. + return r.RoundTripper.RoundTrip(req) + } + + gzipWriter := gzip.NewWriter(nil) + + // Gzip the body. + buf := bytes.NewBuffer([]byte{}) + gzipWriter.Reset(buf) + _, copyErr := io.Copy(gzipWriter, req.Body) + closeErr := req.Body.Close() + + if err := gzipWriter.Close(); err != nil { + return nil, err + } + + if copyErr != nil { + return nil, copyErr + } + if closeErr != nil { + return nil, closeErr + } + + // Create a new request since the docs say that we cannot modify the "req" + // (see https://golang.org/pkg/net/http/#RoundTripper). + cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + // Clone the headers and add gzip encoding header. + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + return r.RoundTripper.RoundTrip(cReq) +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e..a303851679b 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,67 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct {