Skip to content

Commit

Permalink
Support Snappy and Zstd for confighttp.go (#4441)
Browse files Browse the repository at this point in the history
* feat: add snappy and zstd into confighttp

* fix: format the code

* fix: refactored. tests wip

* feat: add tests

* fix: fmt and lint

* fix: modified readme, removed decompression support, and removed compression from otlp

* fix: add compression on otlphttpexporter/readme

* chore: add changelog

* fix: applied changes from code review

* fix: comments and changelog

* fix: applied changes based on the code review

* fix: changed commend on changelog

* fix: changed map to switch case

* fix: move CompressionType to middleware to remove duplicated value

* fix: implement UnmarshalText for CompressionType

* fix: lint error

* fix: remove recommandation for compression type from readme

* chore: changelog
  • Loading branch information
Hyunuk Lim authored Dec 10, 2021
1 parent 02cab16 commit 4dcb338
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 111 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## 💡 Enhancements 💡

- `confighttp`: add client-side compression support. (#4441)
- Each exporter should remove `compression` field if they have and should use `confighttp.HTTPClientSettings`
- Allow more zap logger configs: `disable_caller`, `disable_stacktrace`, `output_paths`, `error_output_paths`, `initial_fields` (#1048)
- `configauth`: add ServerAuthenticator interfaces for HTTP receivers. (#4506)

Expand Down
4 changes: 4 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ README](../configtls/README.md).
- [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- [`timeout`](https://golang.org/pkg/net/http/#Client)
- [`write_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, and `deflate`.
- look at the documentation for the server-side of the communication.
- `none` will be treated as uncompressed, and any other inputs will cause an error.
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
- [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
- [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
Expand All @@ -38,6 +41,7 @@ exporter:
headers:
test1: "value1"
"test 2": "value 2"
compression: zstd
```
## Server Configuration
Expand Down
9 changes: 9 additions & 0 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type HTTPClientSettings struct {
// Auth configuration for outgoing HTTP calls.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// The compression key for supported compression types within collector.
Compression middleware.CompressionType `mapstructure:"compression"`

// MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open.
// There's an already set value, and we want to override it only if an explicit value provided
MaxIdleConns *int `mapstructure:"max_idle_conns"`
Expand Down Expand Up @@ -133,6 +136,12 @@ func (hcs *HTTPClientSettings) ToClient(ext map[config.ComponentID]component.Ext
}
}

// Compress the body using specified compression methods if non-empty string is provided.
// Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed.
if hcs.Compression != middleware.CompressionEmpty && hcs.Compression != middleware.CompressionNone {
clientTransport = middleware.NewCompressRoundTripper(clientTransport, hcs.Compression)
}

if hcs.Auth != nil {
if ext == nil {
return nil, fmt.Errorf("extensions configuration not found")
Expand Down
57 changes: 49 additions & 8 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
)

type customRoundTripper struct {
Expand Down Expand Up @@ -74,6 +75,43 @@ func TestAllHTTPClientSettings(t *testing.T) {
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "",
},
shouldError: false,
},
{
name: "all_valid_settings_with_none_compression",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
ReadBufferSize: 1024,
WriteBufferSize: 512,
MaxIdleConns: &maxIdleConns,
MaxIdleConnsPerHost: &maxIdleConnsPerHost,
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "none",
},
shouldError: false,
},
{
name: "all_valid_settings_with_gzip_compression",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
ReadBufferSize: 1024,
WriteBufferSize: 512,
MaxIdleConns: &maxIdleConns,
MaxIdleConnsPerHost: &maxIdleConnsPerHost,
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "gzip",
},
shouldError: false,
},
Expand All @@ -100,14 +138,17 @@ func TestAllHTTPClientSettings(t *testing.T) {
return
}
assert.NoError(t, err)
transport := client.Transport.(*http.Transport)
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
assert.EqualValues(t, 50, transport.MaxIdleConns)
assert.EqualValues(t, 40, transport.MaxIdleConnsPerHost)
assert.EqualValues(t, 45, transport.MaxConnsPerHost)
assert.EqualValues(t, 30*time.Second, transport.IdleConnTimeout)

switch transport := client.Transport.(type) {
case *http.Transport:
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
assert.EqualValues(t, 50, transport.MaxIdleConns)
assert.EqualValues(t, 40, transport.MaxIdleConnsPerHost)
assert.EqualValues(t, 45, transport.MaxConnsPerHost)
assert.EqualValues(t, 30*time.Second, transport.IdleConnTimeout)
case *middleware.CompressRoundTripper:
assert.EqualValues(t, "gzip", transport.CompressionType())
}
})
}
}
Expand Down
4 changes: 0 additions & 4 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ The following settings can be optionally configured:
- `ca_file` path to the CA cert. For a client this verifies the server certificate. Should only be used if `insecure` is set to false.
- `cert_file` path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is set to false.
- `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false.

- `compression` (default = none): Compression type to use (only gzip is supported today)

- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.


Example:

```yaml
Expand Down
4 changes: 0 additions & 4 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type Config struct {

// The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used.
LogsEndpoint string `mapstructure:"logs_endpoint"`

// The compression key for supported compression types within
// collector. Currently the only supported mode is `gzip`.
Compression string `mapstructure:"compression"`
}

var _ config.Exporter = (*Config)(nil)
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestLoadConfig(t *testing.T) {
ReadBufferSize: 123,
WriteBufferSize: 345,
Timeout: time.Second * 10,
Compression: "gzip",
},
Compression: "gzip",
})
}
11 changes: 0 additions & 11 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/url"
"runtime"
"strconv"
"strings"
"time"

"go.uber.org/zap"
Expand All @@ -34,10 +33,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/middleware"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -89,14 +86,6 @@ func (e *exporter) start(_ context.Context, host component.Host) error {
if err != nil {
return err
}

if e.config.Compression != "" {
if strings.ToLower(e.config.Compression) == configgrpc.CompressionGzip {
client.Transport = middleware.NewCompressRoundTripper(client.Transport)
} else {
return fmt.Errorf("unsupported compression type %q", e.config.Compression)
}
}
e.client = client
return nil
}
Expand Down
57 changes: 0 additions & 57 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,63 +134,6 @@ func TestTraceRoundTrip(t *testing.T) {
}
}

func TestCompressionOptions(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
baseURL string
compression string
err bool
}{
{
name: "no compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "",
},
{
name: "gzip",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip",
},
{
name: "incorrect compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip2",
err: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTracesReceiver(t, addr, sink)

factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, _ := factory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
err := exp.Start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
if test.err {
require.Error(t, err)
return
}

td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpanCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}

func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0
github.com/cenkalti/backoff/v4 v4.1.2
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/klauspost/compress v1.13.6
github.com/knadh/koanf v1.3.3
github.com/magiconair/properties v1.8.5
github.com/mitchellh/mapstructure v1.4.3
Expand Down Expand Up @@ -50,9 +52,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand Down
Loading

0 comments on commit 4dcb338

Please sign in to comment.