Skip to content

Commit

Permalink
Fixes #1858: Allow specifying a custom HTTP transport for the otlphtt…
Browse files Browse the repository at this point in the history
…p driver
  • Loading branch information
arnogeurts-sqills committed May 5, 2021
1 parent cbcd4b1 commit 3b57603
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 35 deletions.
41 changes: 33 additions & 8 deletions exporters/otlp/internal/otlpconfig/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/internal/o
import (
"crypto/tls"
"fmt"
"net/http"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -64,6 +65,9 @@ type (
Timeout time.Duration
URLPath string

// http configurations
HTTPTransport http.RoundTripper

// gRPC configurations
GRPCCredentials credentials.TransportCredentials
}
Expand All @@ -89,16 +93,18 @@ type (
func NewDefaultConfig() Config {
c := Config{
Traces: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultTracesPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultTracesPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
HTTPTransport: http.DefaultTransport,
},
Metrics: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultMetricsPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultMetricsPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
HTTPTransport: http.DefaultTransport,
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
Expand Down Expand Up @@ -364,3 +370,22 @@ func WithBackoff(duration time.Duration) GenericOption {
cfg.Backoff = duration
})
}

func WithMetricsHTTPTransport(transport http.RoundTripper) HTTPOption {
return NewHTTPOption(func(cfg *Config) {
cfg.Metrics.HTTPTransport = transport
})
}

func WithTracesHTTPTransport(transport http.RoundTripper) HTTPOption {
return NewHTTPOption(func(cfg *Config) {
cfg.Traces.HTTPTransport = transport
})
}

func WithHTTPTransport(transport http.RoundTripper) HTTPOption {
return NewHTTPOption(func(cfg *Config) {
cfg.Metrics.HTTPTransport = transport
cfg.Traces.HTTPTransport = transport
})
}
54 changes: 54 additions & 0 deletions exporters/otlp/internal/otlpconfig/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -423,3 +424,56 @@ func TestConfigs(t *testing.T) {
})
}
}

func TestHTTPConfigs(t *testing.T) {
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.MaxConnsPerHost = 42

tests := []struct {
name string
opts []HTTPOption
asserts func(t *testing.T, c *Config)
}{
// HTTP Transport Tests
{
name: "Test with custom traces transport",
opts: []HTTPOption{
WithTracesHTTPTransport(customTransport),
},
asserts: func(t *testing.T, c *Config) {
assert.Equal(t, customTransport, c.Traces.HTTPTransport)
assert.Equal(t, http.DefaultTransport, c.Metrics.HTTPTransport)
},
},
{
name: "Test with custom metrics transport",
opts: []HTTPOption{
WithMetricsHTTPTransport(customTransport),
},
asserts: func(t *testing.T, c *Config) {
assert.Equal(t, http.DefaultTransport, c.Traces.HTTPTransport)
assert.Equal(t, customTransport, c.Metrics.HTTPTransport)
},
},
{
name: "Test with custom transports",
opts: []HTTPOption{
WithHTTPTransport(customTransport),
},
asserts: func(t *testing.T, c *Config) {
assert.Equal(t, customTransport, c.Traces.HTTPTransport)
assert.Equal(t, customTransport, c.Metrics.HTTPTransport)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := NewDefaultConfig()
for _, opt := range tt.opts {
opt.ApplyHTTPOption(&cfg)
}
tt.asserts(t, &cfg)
})
}
}
41 changes: 14 additions & 27 deletions exporters/otlp/otlphttp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"path"
"strings"
Expand All @@ -45,24 +44,6 @@ import (
const contentTypeProto = "application/x-protobuf"
const contentTypeJSON = "application/json"

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by other package.
var ourTransport *http.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

type driver struct {
metricsDriver signalDriver
tracesDriver signalDriver
Expand Down Expand Up @@ -115,23 +96,29 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver {
}

metricsClient := &http.Client{
Transport: ourTransport,
Transport: cfg.Metrics.HTTPTransport,
Timeout: cfg.Metrics.Timeout,
}
if cfg.Metrics.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Metrics.TLSCfg
metricsClient.Transport = transport
// only support TLS config when the HTTP transport is a *http.Transport
if tr, ok := metricsClient.Transport.(*http.Transport); ok {
transport := tr.Clone()
transport.TLSClientConfig = cfg.Metrics.TLSCfg
metricsClient.Transport = transport
}
}

tracesClient := &http.Client{
Transport: ourTransport,
Transport: cfg.Traces.HTTPTransport,
Timeout: cfg.Traces.Timeout,
}
if cfg.Traces.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Traces.TLSCfg
tracesClient.Transport = transport
// only support TLS config when the HTTP transport is a *http.Transport
if tr, ok := tracesClient.Transport.(*http.Transport); ok {
transport := tr.Clone()
transport.TLSClientConfig = cfg.Traces.TLSCfg
tracesClient.Transport = transport
}
}

stopCh := make(chan struct{})
Expand Down
25 changes: 25 additions & 0 deletions exporters/otlp/otlphttp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otlphttp

import (
"crypto/tls"
"net/http"
"time"

"go.opentelemetry.io/otel/exporters/otlp"
Expand Down Expand Up @@ -197,3 +198,27 @@ func WithTracesTimeout(duration time.Duration) Option {
func WithMetricsTimeout(duration time.Duration) Option {
return otlpconfig.WithMetricsTimeout(duration)
}

// WithMetricsHTTPTransport can be used to customize the HTTP transport that is used to
// handle the HTTP connection to the metrics server
func WithMetricsHTTPTransport(transport http.RoundTripper) Option {
return otlpconfig.WithMetricsHTTPTransport(transport)
}

// WithTracesHTTPTransport can be used to customize the HTTP transport that is used to
// handle the HTTP connection to the traces server
//
// Note that a client should not provide a otelhttp.Transport to this method, as it will
// create spans for every outgoing request, causing never-ending trace data.
func WithTracesHTTPTransport(transport http.RoundTripper) Option {
return otlpconfig.WithTracesHTTPTransport(transport)
}

// WithTracesHTTPTransport can be used to customize the HTTP transport that is used to
// handle both the HTTP connection to the traces and the metrics server.
//
// Note that a client should not provide a otelhttp.Transport to this method, as it will
// create spans for every outgoing request, causing never-ending trace data.
func WithHTTPTransport(transport http.RoundTripper) Option {
return otlpconfig.WithHTTPTransport(transport)
}

0 comments on commit 3b57603

Please sign in to comment.