diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107b..7dd10b47f07 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 6eb8c76efa3..7dd17702e5f 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -35,4 +35,8 @@ type Config struct { // The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used. LogsEndpoint string `mapstructure:"logs_endpoint"` + + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` } diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index a0d3aa83a3b..c302fe76e8c 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -81,5 +81,6 @@ func TestLoadConfig(t *testing.T) { WriteBufferSize: 345, Timeout: time.Second * 10, }, + Compression: "gzip", }) } diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index fdb99579b2c..252478acedb 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -24,16 +24,19 @@ import ( "net/http" "net/url" "strconv" + "strings" "time" "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/internal/middleware" ) type exporterImp struct { @@ -67,6 +70,14 @@ func newExporter(cfg configmodels.Exporter, logger *zap.Logger) (*exporterImp, e return nil, err } + if oCfg.Compression != "" { + if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip { + client.Transport = middleware.NewCompressRoundTripper(client.Transport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression) + } + } + return &exporterImp{ config: oCfg, client: client, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index f9bc3ce98df..bf1fda8f743 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) { } } +func TestCompressionOptions(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + + tests := []struct { + name string + baseURL string + compression string + err bool + }{ + { + name: "no compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "", + }, + { + name: "gzip", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip", + }, + { + name: "incorrect compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip2", + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + startTraceReceiver(t, addr, sink) + + factory := NewFactory() + cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) + cfg.Compression = test.compression + exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + if test.err { + assert.Error(t, err) + return + } + require.NoError(t, err) + startAndCleanup(t, exp) + + td := testdata.GenerateTraceDataOneSpan() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + require.Eventually(t, func() bool { + return sink.SpansCount() > 0 + }, 1*time.Second, 10*time.Millisecond) + allTraces := sink.AllTraces() + require.Len(t, allTraces, 1) + assert.EqualValues(t, td, allTraces[0]) + }) + } +} + func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) diff --git a/exporter/otlphttpexporter/testdata/config.yaml b/exporter/otlphttpexporter/testdata/config.yaml index bf6bb642445..9fbbe10c8a2 100644 --- a/exporter/otlphttpexporter/testdata/config.yaml +++ b/exporter/otlphttpexporter/testdata/config.yaml @@ -28,6 +28,7 @@ exporters: "can you have a . here?": "F0000000-0000-0000-0000-000000000000" header1: 234 another: "somevalue" + compression: gzip service: pipelines: diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953..8b0f3cdd09e 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,69 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + + if req.Header.Get(headerContentEncoding) != "" { + // If the header already specifies a content encoding then skip compression + // since we don't want to compress it again. This is a safeguard that normally + // should not happen since CompressRoundTripper is not intended to be used + // with http clients which already do their own compression. + return r.RoundTripper.RoundTrip(req) + } + + // Gzip the body. + buf := bytes.NewBuffer([]byte{}) + gzipWriter := gzip.NewWriter(buf) + _, copyErr := io.Copy(gzipWriter, req.Body) + closeErr := req.Body.Close() + + if err := gzipWriter.Close(); err != nil { + return nil, err + } + + if copyErr != nil { + return nil, copyErr + } + if closeErr != nil { + return nil, closeErr + } + + // Create a new request since the docs say that we cannot modify the "req" + // (see https://golang.org/pkg/net/http/#RoundTripper). + cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + // Clone the headers and add gzip encoding header. + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + return r.RoundTripper.RoundTrip(cReq) +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e..a303851679b 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,67 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct { diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index cb9e8ce6712..fe45604ea32 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -186,17 +186,18 @@ func (jr *JaegerDataReceiver) ProtocolName() string { return "jaeger" } -// baseOTLPDataReceiver implements the OTLP format receiver. -type baseOTLPDataReceiver struct { +// BaseOTLPDataReceiver implements the OTLP format receiver. +type BaseOTLPDataReceiver struct { DataReceiverBase // One of the "otlp" for OTLP over gRPC or "otlphttp" for OTLP over HTTP. exporterType string traceReceiver component.TracesReceiver metricsReceiver component.MetricsReceiver logReceiver component.LogsReceiver + compression string } -func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error { +func (bor *BaseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error { factory := otlpreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) cfg.SetName(bor.exporterType) @@ -228,7 +229,12 @@ func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.M return bor.logReceiver.Start(context.Background(), bor) } -func (bor *baseOTLPDataReceiver) Stop() error { +func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDataReceiver { + bor.compression = compression + return bor +} + +func (bor *BaseOTLPDataReceiver) Stop() error { if err := bor.traceReceiver.Shutdown(context.Background()); err != nil { return err } @@ -238,28 +244,35 @@ func (bor *baseOTLPDataReceiver) Stop() error { return bor.logReceiver.Shutdown(context.Background()) } -func (bor *baseOTLPDataReceiver) ProtocolName() string { +func (bor *BaseOTLPDataReceiver) ProtocolName() string { return bor.exporterType } -func (bor *baseOTLPDataReceiver) GenConfigYAMLStr() string { +func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { addr := fmt.Sprintf("localhost:%d", bor.Port) if bor.exporterType == "otlphttp" { addr = "http://" + addr } // Note that this generates an exporter config for agent. - return fmt.Sprintf(` + str := fmt.Sprintf(` %s: endpoint: "%s" insecure: true`, bor.exporterType, addr) + + if bor.compression != "" { + str += fmt.Sprintf(` + compression: "%s"`, bor.compression) + } + + return str } const DefaultOTLPPort = 55680 // NewOTLPDataReceiver creates a new OTLP DataReceiver that will listen on the specified port after Start // is called. -func NewOTLPDataReceiver(port int) DataReceiver { - return &baseOTLPDataReceiver{ +func NewOTLPDataReceiver(port int) *BaseOTLPDataReceiver { + return &BaseOTLPDataReceiver{ DataReceiverBase: DataReceiverBase{Port: port}, exporterType: "otlp", } @@ -267,8 +280,8 @@ func NewOTLPDataReceiver(port int) DataReceiver { // NewOTLPDataReceiver creates a new OTLP/HTTP DataReceiver that will listen on the specified port after Start // is called. -func NewOTLPHTTPDataReceiver(port int) DataReceiver { - return &baseOTLPDataReceiver{ +func NewOTLPHTTPDataReceiver(port int) *BaseOTLPDataReceiver { + return &BaseOTLPDataReceiver{ DataReceiverBase: DataReceiverBase{Port: port}, exporterType: "otlphttp", } diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index d41f4535d12..0d3d1ac98d3 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -66,7 +66,7 @@ func TestTrace10kSPS(t *testing.T) { }, }, { - "OTLP", + "OTLP-gRPC", testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ @@ -74,6 +74,15 @@ func TestTrace10kSPS(t *testing.T) { ExpectedMaxRAM: 70, }, }, + { + "OTLP-gRPC-gzip", + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 100, + }, + }, { "OTLP-HTTP", testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), @@ -83,6 +92,15 @@ func TestTrace10kSPS(t *testing.T) { ExpectedMaxRAM: 100, }, }, + { + "OTLP-HTTP-gzip", + testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 25, + ExpectedMaxRAM: 100, + }, + }, { "Zipkin", testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),