From 1f78c79f549461c92a005c164428a6fc1965e458 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 16 Feb 2021 20:12:33 -0500 Subject: [PATCH] Add "compression" option to otlphttp exporter The default value of "compression" option is empty and results in no compression, which is the same as the old behavior. You can also specify "compression: gzip" which will result in gzip encoding of outgoing http requests. --- exporter/otlphttpexporter/README.md | 2 + exporter/otlphttpexporter/config.go | 4 ++ exporter/otlphttpexporter/config_test.go | 1 + exporter/otlphttpexporter/otlp.go | 11 ++++ exporter/otlphttpexporter/otlp_test.go | 55 +++++++++++++++++ .../otlphttpexporter/testdata/config.yaml | 1 + internal/middleware/compression.go | 57 +++++++++++++++++ internal/middleware/compression_test.go | 61 +++++++++++++++++++ testbed/testbed/receivers.go | 35 +++++++---- testbed/tests/trace_test.go | 20 +++++- 10 files changed, 235 insertions(+), 12 deletions(-) diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107b..7dd10b47f07 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 6eb8c76efa3..7dd17702e5f 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -35,4 +35,8 @@ type Config struct { // The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used. LogsEndpoint string `mapstructure:"logs_endpoint"` + + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` } diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index a0d3aa83a3b..c302fe76e8c 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -81,5 +81,6 @@ func TestLoadConfig(t *testing.T) { WriteBufferSize: 345, Timeout: time.Second * 10, }, + Compression: "gzip", }) } diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index fdb99579b2c..252478acedb 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -24,16 +24,19 @@ import ( "net/http" "net/url" "strconv" + "strings" "time" "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/internal/middleware" ) type exporterImp struct { @@ -67,6 +70,14 @@ func newExporter(cfg configmodels.Exporter, logger *zap.Logger) (*exporterImp, e return nil, err } + if oCfg.Compression != "" { + if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip { + client.Transport = middleware.NewCompressRoundTripper(client.Transport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression) + } + } + return &exporterImp{ config: oCfg, client: client, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index f9bc3ce98df..bf1fda8f743 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) { } } +func TestCompressionOptions(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + + tests := []struct { + name string + baseURL string + compression string + err bool + }{ + { + name: "no compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "", + }, + { + name: "gzip", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip", + }, + { + name: "incorrect compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip2", + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + startTraceReceiver(t, addr, sink) + + factory := NewFactory() + cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) + cfg.Compression = test.compression + exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + if test.err { + assert.Error(t, err) + return + } + require.NoError(t, err) + startAndCleanup(t, exp) + + td := testdata.GenerateTraceDataOneSpan() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + require.Eventually(t, func() bool { + return sink.SpansCount() > 0 + }, 1*time.Second, 10*time.Millisecond) + allTraces := sink.AllTraces() + require.Len(t, allTraces, 1) + assert.EqualValues(t, td, allTraces[0]) + }) + } +} + func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) diff --git a/exporter/otlphttpexporter/testdata/config.yaml b/exporter/otlphttpexporter/testdata/config.yaml index bf6bb642445..9fbbe10c8a2 100644 --- a/exporter/otlphttpexporter/testdata/config.yaml +++ b/exporter/otlphttpexporter/testdata/config.yaml @@ -28,6 +28,7 @@ exporters: "can you have a . here?": "F0000000-0000-0000-0000-000000000000" header1: 234 another: "somevalue" + compression: gzip service: pipelines: diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953..8b0f3cdd09e 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,69 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + + if req.Header.Get(headerContentEncoding) != "" { + // If the header already specifies a content encoding then skip compression + // since we don't want to compress it again. This is a safeguard that normally + // should not happen since CompressRoundTripper is not intended to be used + // with http clients which already do their own compression. + return r.RoundTripper.RoundTrip(req) + } + + // Gzip the body. + buf := bytes.NewBuffer([]byte{}) + gzipWriter := gzip.NewWriter(buf) + _, copyErr := io.Copy(gzipWriter, req.Body) + closeErr := req.Body.Close() + + if err := gzipWriter.Close(); err != nil { + return nil, err + } + + if copyErr != nil { + return nil, copyErr + } + if closeErr != nil { + return nil, closeErr + } + + // Create a new request since the docs say that we cannot modify the "req" + // (see https://golang.org/pkg/net/http/#RoundTripper). + cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + // Clone the headers and add gzip encoding header. + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + return r.RoundTripper.RoundTrip(cReq) +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e..a303851679b 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,67 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct { diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index cb9e8ce6712..fe45604ea32 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -186,17 +186,18 @@ func (jr *JaegerDataReceiver) ProtocolName() string { return "jaeger" } -// baseOTLPDataReceiver implements the OTLP format receiver. -type baseOTLPDataReceiver struct { +// BaseOTLPDataReceiver implements the OTLP format receiver. +type BaseOTLPDataReceiver struct { DataReceiverBase // One of the "otlp" for OTLP over gRPC or "otlphttp" for OTLP over HTTP. exporterType string traceReceiver component.TracesReceiver metricsReceiver component.MetricsReceiver logReceiver component.LogsReceiver + compression string } -func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error { +func (bor *BaseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error { factory := otlpreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) cfg.SetName(bor.exporterType) @@ -228,7 +229,12 @@ func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.M return bor.logReceiver.Start(context.Background(), bor) } -func (bor *baseOTLPDataReceiver) Stop() error { +func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDataReceiver { + bor.compression = compression + return bor +} + +func (bor *BaseOTLPDataReceiver) Stop() error { if err := bor.traceReceiver.Shutdown(context.Background()); err != nil { return err } @@ -238,28 +244,35 @@ func (bor *baseOTLPDataReceiver) Stop() error { return bor.logReceiver.Shutdown(context.Background()) } -func (bor *baseOTLPDataReceiver) ProtocolName() string { +func (bor *BaseOTLPDataReceiver) ProtocolName() string { return bor.exporterType } -func (bor *baseOTLPDataReceiver) GenConfigYAMLStr() string { +func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { addr := fmt.Sprintf("localhost:%d", bor.Port) if bor.exporterType == "otlphttp" { addr = "http://" + addr } // Note that this generates an exporter config for agent. - return fmt.Sprintf(` + str := fmt.Sprintf(` %s: endpoint: "%s" insecure: true`, bor.exporterType, addr) + + if bor.compression != "" { + str += fmt.Sprintf(` + compression: "%s"`, bor.compression) + } + + return str } const DefaultOTLPPort = 55680 // NewOTLPDataReceiver creates a new OTLP DataReceiver that will listen on the specified port after Start // is called. -func NewOTLPDataReceiver(port int) DataReceiver { - return &baseOTLPDataReceiver{ +func NewOTLPDataReceiver(port int) *BaseOTLPDataReceiver { + return &BaseOTLPDataReceiver{ DataReceiverBase: DataReceiverBase{Port: port}, exporterType: "otlp", } @@ -267,8 +280,8 @@ func NewOTLPDataReceiver(port int) DataReceiver { // NewOTLPDataReceiver creates a new OTLP/HTTP DataReceiver that will listen on the specified port after Start // is called. -func NewOTLPHTTPDataReceiver(port int) DataReceiver { - return &baseOTLPDataReceiver{ +func NewOTLPHTTPDataReceiver(port int) *BaseOTLPDataReceiver { + return &BaseOTLPDataReceiver{ DataReceiverBase: DataReceiverBase{Port: port}, exporterType: "otlphttp", } diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index d41f4535d12..0d3d1ac98d3 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -66,7 +66,7 @@ func TestTrace10kSPS(t *testing.T) { }, }, { - "OTLP", + "OTLP-gRPC", testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ @@ -74,6 +74,15 @@ func TestTrace10kSPS(t *testing.T) { ExpectedMaxRAM: 70, }, }, + { + "OTLP-gRPC-gzip", + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 100, + }, + }, { "OTLP-HTTP", testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), @@ -83,6 +92,15 @@ func TestTrace10kSPS(t *testing.T) { ExpectedMaxRAM: 100, }, }, + { + "OTLP-HTTP-gzip", + testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 25, + ExpectedMaxRAM: 100, + }, + }, { "Zipkin", testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),