Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] Feature gate for queue batcher #11721

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -45,6 +46,7 @@ require (
go.opentelemetry.io/collector/consumer/consumertest v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.20.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down Expand Up @@ -115,3 +117,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c
replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest

replace go.opentelemetry.io/collector/scraper => ../../scraper

replace go.opentelemetry.io/collector/featuregate => ../../featuregate
2 changes: 2 additions & 0 deletions exporter/debugexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.20.0 // indirect
go.opentelemetry.io/collector/pdata v1.20.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down Expand Up @@ -102,3 +104,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../../consume
replace go.opentelemetry.io/collector/extension/extensiontest => ../../../extension/extensiontest

replace go.opentelemetry.io/collector/scraper => ../../../scraper

replace go.opentelemetry.io/collector/featuregate => ../../../featuregate
2 changes: 2 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@ import (
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types.
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pipeline"
)

var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister(
"telemetry.UsePullingBasedExporterQueueBatcher",
featuregate.StageBeta,
featuregate.WithRegisterFromVersion("v0.114.0"),
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender

// Option apply changes to BaseExporter.
Expand Down
102 changes: 65 additions & 37 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,52 +38,80 @@ func newNoopObsrepSender(*ObsReport) RequestSender {
}

func TestBaseExporter(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutConfig()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutConfig()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)

_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.Error(t, err)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.Error(t, err)
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestBaseExporterLogging(t *testing.T) {
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg))
require.NoError(t, err)
sendErr := bs.Send(context.Background(), newErrorRequest())
require.Error(t, sendErr)
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg))
require.NoError(t, err)
sendErr := bs.Send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}
Loading
Loading