Skip to content

Commit

Permalink
[exporter/splunkhecexporter] expose HTTPClientSettings on splunkhecex…
Browse files Browse the repository at this point in the history
…porter (#16839)
  • Loading branch information
atoulme authored Jan 26, 2023
1 parent 939107a commit 395c731
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 883 deletions.
16 changes: 16 additions & 0 deletions .chloggen/splunkhecexporter_httpclientsettings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Expose HTTPClientSettings on splunkhecexporter

# One or more tracking issues related to the change
issues: [16838]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 10 additions & 2 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following configuration options can also be configured:
- `source` (no default): Optional Splunk source: https://docs.splunk.com/Splexicon:Source
- `sourcetype` (no default): Optional Splunk source type: https://docs.splunk.com/Splexicon:Sourcetype
- `index` (no default): Splunk index, optional name of the Splunk index targeted
- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data.
- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data. Deprecated: use `max_idle_conns` or `max_idle_conns_per_host` instead. See [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md) for more info.
- `disable_compression` (default: false): Whether to disable gzip compression over HTTP.
- `timeout` (default: 10s): HTTP timeout when sending data.
- `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS.
Expand Down Expand Up @@ -92,7 +92,7 @@ exporters:
# Splunk index, optional name of the Splunk index targeted.
index: "metrics"
# Maximum HTTP connections to use simultaneously when sending data. Defaults to 100.
max_connections: 200
max_idle_conns: 200
# Whether to disable gzip compression over HTTP. Defaults to false.
disable_compression: false
# HTTP timeout when sending data. Defaults to 10s.
Expand All @@ -118,5 +118,13 @@ with detailed sample configurations [here](testdata/config.yaml).
This exporter also offers proxy support as documented
[here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter#proxy-support).
## Advanced Configuration
Several helper files are leveraged to provide additional capabilities automatically:
- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
81 changes: 76 additions & 5 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"sync"

jsoniter "github.com/json-iterator/go"
Expand All @@ -34,10 +36,12 @@ import (

// client sends the data to the splunk backend.
type client struct {
config *Config
logger *zap.Logger
wg sync.WaitGroup
hecWorker hecWorker
config *Config
logger *zap.Logger
wg sync.WaitGroup
telemetrySettings component.TelemetrySettings
hecWorker hecWorker
buildInfo component.BuildInfo
}

func (c *client) pushMetricsData(
Expand Down Expand Up @@ -615,6 +619,73 @@ func (c *client) stop(context.Context) error {
return nil
}

func (c *client) start(context.Context, component.Host) (err error) {
func (c *client) start(ctx context.Context, host component.Host) (err error) {

httpClient, err := buildHTTPClient(c.config, host, c.telemetrySettings)
if err != nil {
return err
}

if c.config.HecHealthCheckEnabled {
healthCheckURL, _ := c.config.getURL()
healthCheckURL.Path = c.config.HealthPath
if err := checkHecHealth(httpClient, healthCheckURL); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)}
return nil
}

func checkHecHealth(client *http.Client, healthCheckURL *url.URL) error {

req, err := http.NewRequest("GET", healthCheckURL.String(), nil)
if err != nil {
return consumererror.NewPermanent(err)
}

resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

err = splunk.HandleHTTPCode(resp)
if err != nil {
return err
}

return nil
}

func buildHTTPClient(config *Config, host component.Host, telemetrySettings component.TelemetrySettings) (*http.Client, error) {
// we handle compression explicitly.
config.HTTPClientSettings.Compression = ""
if config.MaxConnections != 0 && (config.MaxIdleConns == nil || config.HTTPClientSettings.MaxIdleConnsPerHost == nil) {
telemetrySettings.Logger.Warn("You are using the deprecated `max_connections` option that will be removed soon; use `max_idle_conns` and/or `max_idle_conns_per_host` instead: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/splunkhecexporter#advanced-configuration")
intMaxConns := int(config.MaxConnections)
if config.HTTPClientSettings.MaxIdleConns == nil {
config.HTTPClientSettings.MaxIdleConns = &intMaxConns
}
if config.HTTPClientSettings.MaxIdleConnsPerHost == nil {
config.HTTPClientSettings.MaxIdleConnsPerHost = &intMaxConns
}
}
return config.ToClient(host, telemetrySettings)
}

func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]string {
appVersion := config.SplunkAppVersion
if appVersion == "" {
appVersion = buildInfo.Version
}
return map[string]string{
"Connection": "keep-alive",
"Content-Type": "application/json",
"User-Agent": config.SplunkAppName + "/" + appVersion,
"Authorization": splunk.HECTokenHeader + " " + string(config.Token),
"__splunk_app_name": config.SplunkAppName,
"__splunk_app_version": config.SplunkAppVersion,
}
}
29 changes: 12 additions & 17 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -208,7 +209,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i
}

factory := NewFactory()
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.Token = "1234-1234"

rr := make(chan receivedRequest)
Expand Down Expand Up @@ -258,7 +259,7 @@ func runTraceExport(testConfig *Config, traces ptrace.Traces, expectedBatchesNum

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.DisableCompression = testConfig.DisableCompression
cfg.MaxContentLengthTraces = testConfig.MaxContentLengthTraces
cfg.Token = "1234-1234"
Expand Down Expand Up @@ -319,7 +320,7 @@ func runLogExport(cfg *Config, ld plog.Logs, expectedBatchesNum int, t *testing.
panic(err)
}

cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.Token = "1234-1234"

rr := make(chan receivedRequest)
Expand Down Expand Up @@ -881,7 +882,7 @@ func TestErrorReceived(t *testing.T) {

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
Expand Down Expand Up @@ -930,7 +931,7 @@ func TestInvalidURL(t *testing.T) {
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.Endpoint = "ftp://example.com:134"
cfg.HTTPClientSettings.Endpoint = "ftp://example.com:134"
cfg.Token = "1234-1234"
params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
Expand All @@ -957,12 +958,6 @@ func TestInvalidJson(t *testing.T) {
assert.Error(t, err)
}

func TestStartAlwaysReturnsNil(t *testing.T) {
c := client{}
err := c.start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
}

func Test_pushLogData_nil_Logs(t *testing.T) {
tests := []struct {
name func(bool) string
Expand Down Expand Up @@ -1095,7 +1090,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {

// An HTTP client that returns status code 400 and response body responseBody.
httpClient, _ := newTestClient(400, responseBody)
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}
// Sending logs using the client.
err := splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
Expand All @@ -1106,7 +1101,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {

// An HTTP client that returns some other status code other than 400 and response body responseBody.
httpClient, _ = newTestClient(500, responseBody)
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}
// Sending logs using the client.
err = splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
Expand All @@ -1132,7 +1127,7 @@ func Test_pushLogData_ShouldReturnUnsentLogsOnly(t *testing.T) {

// The first record is to be sent successfully, the second one should not
httpClient, _ := newTestClientWithPresetResponses([]int{200, 400}, []string{"OK", "NOK"})
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

err := c.pushLogData(context.Background(), logs)
require.Error(t, err)
Expand All @@ -1157,7 +1152,7 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) {
var headers *[]http.Header

httpClient, headers := newTestClient(200, "OK")
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

// A 300-byte buffer only fits one record (around 200 bytes), so each record will be sent separately
c.config.MaxContentLengthLogs, c.config.DisableCompression = 300, true
Expand Down Expand Up @@ -1223,7 +1218,7 @@ func benchPushLogData(b *testing.B, numResources int, numProfiling int, numNonPr
}

httpClient, _ := newTestClient(200, "OK")
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

c.config.MaxContentLengthLogs = bufSize
logs := createLogDataWithCustomLibraries(numResources, []string{"otel.logs", "otel.profiling"}, []int{numNonProfiling, numProfiling})
Expand All @@ -1241,7 +1236,7 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) {
c := client{
config: config,
logger: zaptest.NewLogger(t),
hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config)},
hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())},
}
c.config.MaxContentLengthLogs = 1

Expand Down
Loading

0 comments on commit 395c731

Please sign in to comment.