Skip to content

Commit

Permalink
[chore] alternative for splunkhec to use otel (#33924)
Browse files Browse the repository at this point in the history
This is an alternative to
#33664,
since mdatagen doesn't support changing metric names, this substitutes
the opencensus sdk calls with otel directly.

Pinging @atoulme @dmitryax @crobert-1 to guide which of the two
approaches is preferred. This component is the last one blocking the
removal of the opencensus bridge in core.

Closes
#33471
Closes
#29867

~NOTE: tests need to be fixed, but i wanted to propose this alternative
before spending time to fix the tests~

---------

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
codeboten authored Jul 8, 2024
1 parent 4a13258 commit f1ba946
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 240 deletions.
6 changes: 5 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

Expand Down Expand Up @@ -53,6 +55,7 @@ type client struct {
heartbeater *heartbeater
bufferPool bufferPool
exporterName string
meter metric.Meter
}

var jsonStreamPool = sync.Pool{
Expand All @@ -69,6 +72,7 @@ func newClient(set exporter.Settings, cfg *Config, maxContentLength uint) *clien
buildInfo: set.BuildInfo,
bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression),
exporterName: set.ID.String(),
meter: metadata.Meter(set.TelemetrySettings),
}
}

Expand Down Expand Up @@ -632,7 +636,7 @@ func (c *client) start(ctx context.Context, host component.Host) (err error) {
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo), c.logger}
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c))
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c), c.meter)
if c.config.Heartbeat.Startup {
if err := c.heartbeater.sendHeartbeat(c.config, c.buildInfo, getPushLogFn(c)); err != nil {
return fmt.Errorf("%s: heartbeat on startup failed: %w", c.exporterName, err)
Expand Down
5 changes: 2 additions & 3 deletions exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.104.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.104.0
go.opentelemetry.io/collector/config/confighttp v0.104.0
go.opentelemetry.io/collector/config/configopaque v1.11.0
Expand All @@ -23,7 +22,9 @@ require (
go.opentelemetry.io/collector/exporter v0.104.0
go.opentelemetry.io/collector/pdata v1.11.0
go.opentelemetry.io/collector/semconv v0.104.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -99,10 +100,8 @@ require (
go.opentelemetry.io/collector/featuregate v1.11.0 // indirect
go.opentelemetry.io/collector/receiver v0.104.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
Expand Down
76 changes: 0 additions & 76 deletions exporter/splunkhecexporter/go.sum

Large diffs are not rendered by default.

66 changes: 25 additions & 41 deletions exporter/splunkhecexporter/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"runtime"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)
Expand All @@ -36,52 +35,38 @@ func getMetricsName(overrides map[string]string, metricName string) string {
return metricName
}

func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error) *heartbeater {
func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error, meter metric.Meter) *heartbeater {
interval := config.Heartbeat.Interval
if interval == 0 {
return nil
}

var heartbeatsSent, heartbeatsFailed *stats.Int64Measure
var tagMutators []tag.Mutator
var heartbeatsSent, heartbeatsFailed metric.Int64Counter
var attrs attribute.Set
if config.Telemetry.Enabled {
overrides := config.Telemetry.OverrideMetricsNames
extraAttributes := config.Telemetry.ExtraAttributes
var tags []tag.Key
tagMutators = []tag.Mutator{}
var tags []attribute.KeyValue
for key, val := range extraAttributes {
newTag, _ := tag.NewKey(key)
tags = append(tags, newTag)
tagMutators = append(tagMutators, tag.Insert(newTag, val))
tags = append(tags, attribute.String(key, val))
}

heartbeatsSent = stats.Int64(
attrs = attribute.NewSet(tags...)
var err error
heartbeatsSent, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBSentMetricsName),
"number of heartbeats sent",
stats.UnitDimensionless)

heartbeatsSentView := &view.View{
Name: heartbeatsSent.Name(),
Description: heartbeatsSent.Description(),
TagKeys: tags,
Measure: heartbeatsSent,
Aggregation: view.Sum(),
metric.WithDescription("number of heartbeats sent"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}

heartbeatsFailed = stats.Int64(
heartbeatsFailed, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBFailedMetricsName),
"number of heartbeats failed",
stats.UnitDimensionless)

heartbeatsFailedView := &view.View{
Name: heartbeatsFailed.Name(),
Description: heartbeatsFailed.Description(),
TagKeys: tags,
Measure: heartbeatsFailed,
Aggregation: view.Sum(),
}

if err := view.Register(heartbeatsSentView, heartbeatsFailedView); err != nil {
metric.WithDescription("number of heartbeats failed"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}
}
Expand All @@ -99,7 +84,7 @@ func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn fun
case <-ticker.C:
err := hbter.sendHeartbeat(config, buildInfo, pushLogFn)
if config.Telemetry.Enabled {
observe(heartbeatsSent, heartbeatsFailed, tagMutators, err)
observe(heartbeatsSent, heartbeatsFailed, attrs, err)
}
}
}
Expand All @@ -116,14 +101,13 @@ func (h *heartbeater) sendHeartbeat(config *Config, buildInfo component.BuildInf
}

// there is only use case for open census metrics recording for now. Extend to use open telemetry in the future.
func observe(heartbeatsSent *stats.Int64Measure, heartbeatsFailed *stats.Int64Measure, tagMutators []tag.Mutator, err error) {
var counter *stats.Int64Measure
func observe(heartbeatsSent, heartbeatsFailed metric.Int64Counter, attrs attribute.Set, err error) {
if err == nil {
counter = heartbeatsSent
heartbeatsSent.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
} else {
counter = heartbeatsFailed
heartbeatsFailed.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
}
_ = stats.RecordWithTags(context.Background(), tagMutators, counter.M(1))

}

func generateHeartbeatLog(hecToOtelAttrs splunk.HecToOtelAttrs, buildInfo component.BuildInfo) plog.Logs {
Expand Down
96 changes: 54 additions & 42 deletions exporter/splunkhecexporter/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
metricnoop "go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

const (
Expand All @@ -36,9 +39,9 @@ func createTestConfig(metricsOverrides map[string]string, enableMetrics bool) *C
return config
}

func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error) {
func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error, mp *sdkmetric.MeterProvider) {
config := createTestConfig(metricsOverrides, enableMetrics)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn, mp.Meter("test"))
t.Cleanup(func() {
hbter.shutdown()
})
Expand All @@ -49,42 +52,42 @@ func assertHeartbeatInfoLog(t *testing.T, l plog.Logs) {
assert.Contains(t, l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString(), "HeartbeatInfo")
}

func getMetricValue(metricName string) []float64 {
viewData, _ := view.RetrieveData(metricName)
var ret []float64
if len(viewData) > 0 {
for _, data := range viewData {
ret = append(ret, data.Data.(*view.SumData).Value)
func getMetricValue(reader *sdkmetric.ManualReader, name string) ([]int64, error) {
var md metricdata.ResourceMetrics
err := reader.Collect(context.Background(), &md)
var ret []int64
for _, sm := range md.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
g := m.Data.(metricdata.Sum[int64])
ret = append(ret, g.DataPoints[0].Value)
}
}
}
return ret
return ret, err
}

func getTags(metricName string) [][]tag.Tag {
viewData, _ := view.RetrieveData(metricName)
var ret [][]tag.Tag
if len(viewData) > 0 {
for _, data := range viewData {
ret = append(ret, data.Tags)
}
}
return ret
}

func resetMetrics(metricsNames ...string) {
for _, metricsName := range metricsNames {
if v := view.Find(metricsName); v != nil {
view.Unregister(v)
func getAttributes(reader *sdkmetric.ManualReader, name string) ([]attribute.Set, error) {
var md metricdata.ResourceMetrics
err := reader.Collect(context.Background(), &md)
var ret []attribute.Set
for _, sm := range md.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
g := m.Data.(metricdata.Sum[int64])
ret = append(ret, g.DataPoints[0].Attributes)
}
}
}
return ret, err
}

func Test_newHeartbeater_disabled(t *testing.T) {
config := createTestConfig(map[string]string{}, false)
config.Heartbeat.Interval = 0
hb := newHeartbeater(config, component.NewDefaultBuildInfo(), func(_ context.Context, _ plog.Logs) error {
return nil
})
}, metricnoop.NewMeterProvider().Meter("test"))
assert.Nil(t, hb)
}

Expand Down Expand Up @@ -115,7 +118,9 @@ func Test_Heartbeat_success(t *testing.T) {
consumeLogsChan <- ld
return nil
}
initHeartbeater(t, tt.metricsOverrides, true, consumeFn)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
initHeartbeater(t, tt.metricsOverrides, true, consumeFn, meterProvider)

assert.Eventually(t, func() bool {
return len(consumeLogsChan) != 0
Expand All @@ -126,31 +131,38 @@ func Test_Heartbeat_success(t *testing.T) {

if tt.enableMetrics {
sentMetricsName := getMetricsName(tt.metricsOverrides, defaultHBSentMetricsName)
failedMetricsName := getMetricsName(tt.metricsOverrides, defaultHBFailedMetricsName)

var got []int64
var err error
assert.Eventually(t, func() bool {
return len(getMetricValue(sentMetricsName)) != 0
got, err = getMetricValue(reader, sentMetricsName)
require.NoError(t, err)
return len(got) != 0
}, time.Second, 10*time.Millisecond)
assert.Greater(t, getMetricValue(sentMetricsName)[0], float64(0), "there should be at least one success metric datapoint")
metricLabelKeyTag, _ := tag.NewKey(metricLabelKey)
assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(sentMetricsName)[0])

resetMetrics(sentMetricsName, failedMetricsName)
assert.Greater(t, got[0], int64(0), "there should be at least one success metric datapoint")
attrs, err := getAttributes(reader, sentMetricsName)
require.NoError(t, err)
assert.Equal(t, attribute.NewSet(attribute.String(metricLabelKey, metricLabelVal)), attrs[0])
}
}
}

func Test_Heartbeat_failure(t *testing.T) {
resetMetrics()
consumeFn := func(_ context.Context, _ plog.Logs) error {
return errors.New("always error")
}
initHeartbeater(t, map[string]string{}, true, consumeFn)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
initHeartbeater(t, map[string]string{}, true, consumeFn, meterProvider)

var got []int64
var err error
assert.Eventually(t, func() bool {
return len(getMetricValue(defaultHBFailedMetricsName)) != 0
got, err = getMetricValue(reader, defaultHBFailedMetricsName)
require.NoError(t, err)
return len(got) != 0
}, time.Second, 10*time.Millisecond)
assert.Greater(t, getMetricValue(defaultHBFailedMetricsName)[0], float64(0), "there should be at least one failure metric datapoint")
metricLabelKeyTag, _ := tag.NewKey(metricLabelKey)
assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(defaultHBFailedMetricsName)[0])
assert.Greater(t, got[0], int64(0), "there should be at least one failure metric datapoint")
attrs, err := getAttributes(reader, defaultHBFailedMetricsName)
require.NoError(t, err)
assert.Equal(t, attribute.NewSet(attribute.String(metricLabelKey, metricLabelVal)), attrs[0])
}
1 change: 0 additions & 1 deletion receiver/splunkhecreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ require (
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/rs/cors v1.11.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.104.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.104.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.11.0 // indirect
Expand Down
Loading

0 comments on commit f1ba946

Please sign in to comment.