Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "compression" option to otlphttp exporter #2502

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ The following settings can be optionally configured:
- `key_file` path to the TLS key to use for TLS required connections. Should
only be used if `insecure` is set to false.

- `compression` (default = none): Compression type to use (only gzip is supported today)

- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.
Expand Down
4 changes: 4 additions & 0 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ type Config struct {

// The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used.
LogsEndpoint string `mapstructure:"logs_endpoint"`

// The compression key for supported compression types within
// collector. Currently the only supported mode is `gzip`.
Compression string `mapstructure:"compression"`
}
1 change: 1 addition & 0 deletions exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ func TestLoadConfig(t *testing.T) {
WriteBufferSize: 345,
Timeout: time.Second * 10,
},
Compression: "gzip",
})
}
11 changes: 11 additions & 0 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"time"

"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/middleware"
)

type exporterImp struct {
Expand Down Expand Up @@ -67,6 +70,14 @@ func newExporter(cfg configmodels.Exporter, logger *zap.Logger) (*exporterImp, e
return nil, err
}

if oCfg.Compression != "" {
if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip {
client.Transport = middleware.NewCompressRoundTripper(client.Transport)
} else {
return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression)
}
}

return &exporterImp{
config: oCfg,
client: client,
Expand Down
55 changes: 55 additions & 0 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) {
}
}

func TestCompressionOptions(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
baseURL string
compression string
err bool
}{
{
name: "no compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "",
},
{
name: "gzip",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip",
},
{
name: "incorrect compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip2",
err: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTraceReceiver(t, addr, sink)

factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
if test.err {
assert.Error(t, err)
return
}
require.NoError(t, err)
startAndCleanup(t, exp)

td := testdata.GenerateTraceDataOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpansCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}

func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

Expand Down
1 change: 1 addition & 0 deletions exporter/otlphttpexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exporters:
"can you have a . here?": "F0000000-0000-0000-0000-000000000000"
header1: 234
another: "somevalue"
compression: gzip

service:
pipelines:
Expand Down
57 changes: 57 additions & 0 deletions internal/middleware/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,69 @@
package middleware

import (
"bytes"
"compress/gzip"
"compress/zlib"
"io"
"net/http"
)

const (
headerContentEncoding = "Content-Encoding"
headerValueGZIP = "gzip"
)

type CompressRoundTripper struct {
http.RoundTripper
}

func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
return &CompressRoundTripper{
RoundTripper: rt,
}
}

func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {

if req.Header.Get(headerContentEncoding) != "" {
// If the header already specifies a content encoding then skip compression
// since we don't want to compress it again. This is a safeguard that normally
// should not happen since CompressRoundTripper is not intended to be used
// with http clients which already do their own compression.
return r.RoundTripper.RoundTrip(req)
}

// Gzip the body.
buf := bytes.NewBuffer([]byte{})
gzipWriter := gzip.NewWriter(buf)
_, copyErr := io.Copy(gzipWriter, req.Body)
closeErr := req.Body.Close()

if err := gzipWriter.Close(); err != nil {
return nil, err
}

if copyErr != nil {
return nil, copyErr
}
if closeErr != nil {
return nil, closeErr
}

// Create a new request since the docs say that we cannot modify the "req"
// (see https://golang.org/pkg/net/http/#RoundTripper).
cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf)
if err != nil {
return nil, err
}

// Clone the headers and add gzip encoding header.
cReq.Header = req.Header.Clone()
cReq.Header.Add(headerContentEncoding, headerValueGZIP)
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

return r.RoundTripper.RoundTrip(cReq)
}

type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)

type decompressor struct {
Expand Down
61 changes: 61 additions & 0 deletions internal/middleware/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,67 @@ import (
"go.opentelemetry.io/collector/testutil"
)

func TestHTTPClientCompression(t *testing.T) {
testBody := []byte("uncompressed_text")
compressedBody, _ := compressGzip(testBody)

tests := []struct {
name string
encoding string
reqBody []byte
}{
{
name: "NoCompression",
encoding: "",
reqBody: testBody,
},
{
name: "ValidGzip",
encoding: "gzip",
reqBody: compressedBody.Bytes(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, tt.reqBody, body)
w.WriteHeader(200)
})

addr := testutil.GetAvailableLocalAddress(t)
ln, err := net.Listen("tcp", addr)
require.NoError(t, err, "failed to create listener: %v", err)
srv := &http.Server{
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
Handler: handler,
}
go func() {
_ = srv.Serve(ln)
}()
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

serverURL := fmt.Sprintf("http://%s", ln.Addr().String())
reqBody := bytes.NewBuffer(testBody)

req, err := http.NewRequest("GET", serverURL, reqBody)
require.NoError(t, err, "failed to create request to test handler")

client := http.Client{}
if tt.encoding == "gzip" {
client.Transport = NewCompressRoundTripper(http.DefaultTransport)
}
res, err := client.Do(req)
require.NoError(t, err)

ioutil.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
require.NoError(t, srv.Close())
})
}
}

func TestHTTPContentDecompressionHandler(t *testing.T) {
testBody := []byte("uncompressed_text")
tests := []struct {
Expand Down
35 changes: 24 additions & 11 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,18 @@ func (jr *JaegerDataReceiver) ProtocolName() string {
return "jaeger"
}

// baseOTLPDataReceiver implements the OTLP format receiver.
type baseOTLPDataReceiver struct {
// BaseOTLPDataReceiver implements the OTLP format receiver.
type BaseOTLPDataReceiver struct {
DataReceiverBase
// One of the "otlp" for OTLP over gRPC or "otlphttp" for OTLP over HTTP.
exporterType string
traceReceiver component.TracesReceiver
metricsReceiver component.MetricsReceiver
logReceiver component.LogsReceiver
compression string
}

func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error {
func (bor *BaseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error {
factory := otlpreceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config)
cfg.SetName(bor.exporterType)
Expand Down Expand Up @@ -228,7 +229,12 @@ func (bor *baseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.M
return bor.logReceiver.Start(context.Background(), bor)
}

func (bor *baseOTLPDataReceiver) Stop() error {
func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDataReceiver {
bor.compression = compression
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
if err := bor.traceReceiver.Shutdown(context.Background()); err != nil {
return err
}
Expand All @@ -238,37 +244,44 @@ func (bor *baseOTLPDataReceiver) Stop() error {
return bor.logReceiver.Shutdown(context.Background())
}

func (bor *baseOTLPDataReceiver) ProtocolName() string {
func (bor *BaseOTLPDataReceiver) ProtocolName() string {
return bor.exporterType
}

func (bor *baseOTLPDataReceiver) GenConfigYAMLStr() string {
func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
addr := fmt.Sprintf("localhost:%d", bor.Port)
if bor.exporterType == "otlphttp" {
addr = "http://" + addr
}
// Note that this generates an exporter config for agent.
return fmt.Sprintf(`
str := fmt.Sprintf(`
%s:
endpoint: "%s"
insecure: true`, bor.exporterType, addr)

if bor.compression != "" {
str += fmt.Sprintf(`
compression: "%s"`, bor.compression)
}

return str
}

const DefaultOTLPPort = 55680

// NewOTLPDataReceiver creates a new OTLP DataReceiver that will listen on the specified port after Start
// is called.
func NewOTLPDataReceiver(port int) DataReceiver {
return &baseOTLPDataReceiver{
func NewOTLPDataReceiver(port int) *BaseOTLPDataReceiver {
return &BaseOTLPDataReceiver{
DataReceiverBase: DataReceiverBase{Port: port},
exporterType: "otlp",
}
}

// NewOTLPDataReceiver creates a new OTLP/HTTP DataReceiver that will listen on the specified port after Start
// is called.
func NewOTLPHTTPDataReceiver(port int) DataReceiver {
return &baseOTLPDataReceiver{
func NewOTLPHTTPDataReceiver(port int) *BaseOTLPDataReceiver {
return &BaseOTLPDataReceiver{
DataReceiverBase: DataReceiverBase{Port: port},
exporterType: "otlphttp",
}
Expand Down
20 changes: 19 additions & 1 deletion testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,23 @@ func TestTrace10kSPS(t *testing.T) {
},
},
{
"OTLP",
"OTLP-gRPC",
testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
testbed.ResourceSpec{
ExpectedMaxCPU: 20,
ExpectedMaxRAM: 70,
},
},
{
"OTLP-gRPC-gzip",
testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"),
testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 100,
},
},
{
"OTLP-HTTP",
testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
Expand All @@ -83,6 +92,15 @@ func TestTrace10kSPS(t *testing.T) {
ExpectedMaxRAM: 100,
},
},
{
"OTLP-HTTP-gzip",
testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)).WithCompression("gzip"),
testbed.ResourceSpec{
ExpectedMaxCPU: 25,
ExpectedMaxRAM: 100,
},
},
{
"Zipkin",
testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
Expand Down