Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Deprecate retry::max_requests in favor of retry::max_retries #35571

Merged
merged 9 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate retry::max_requests in favor of retry::max_retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32344]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: retry::max_retries will be exactly retry::max_requests - 1

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ The behaviour of this bulk indexing can be configured with the following setting
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt.
- `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.
Expand Down
8 changes: 2 additions & 6 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Co
func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
maxDocRetry = config.Retry.MaxRetries
}
return &syncBulkIndexer{
config: docappender.BulkIndexerConfig{
Expand Down Expand Up @@ -166,9 +164,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi

var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
maxDocRetry = config.Retry.MaxRetries
}

pool := &asyncBulkIndexer{
Expand Down
25 changes: 23 additions & 2 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// MaxRequests configures how often an HTTP request is retried before it is assumed to be failed.
// MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed.
// Deprecated: use MaxRetries instead.
MaxRequests int `mapstructure:"max_requests"`

// MaxRetries configures how many times an HTTP request is retried.
MaxRetries int `mapstructure:"max_retries"`

// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

Expand Down Expand Up @@ -269,6 +273,14 @@ func (cfg *Config) Validate() error {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
}

if cfg.Retry.MaxRequests < 0 {
return errors.New("retry::max_requests should be non-negative")
}
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}

return nil
}

Expand Down Expand Up @@ -351,11 +363,20 @@ func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) error {
if cfg.Mapping.Dedup != nil {
logger.Warn("dedup is deprecated, and is always enabled")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
}
if cfg.Retry.MaxRequests != 0 {
if cfg.Retry.MaxRetries != 0 {
return errors.New("should specify at most one of retry::max_requests and retry::max_retries")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be checked in the Config::Validate() method, so that the validate command checks it.

}
cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1
// Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior
logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.")
}
return nil
}
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down
14 changes: 2 additions & 12 deletions exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func newElasticsearchClient(
headers := make(http.Header)
headers.Set("User-Agent", userAgent)

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.

maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0

if retryDisabled {
maxRetries = 0
}

// endpoints converts Config.Endpoints, Config.CloudID,
// and Config.ClientConfig.Endpoint to a list of addresses.
endpoints, err := config.endpoints()
Expand All @@ -125,10 +115,10 @@ func newElasticsearchClient(

// configure retry behavior
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: retryDisabled,
DisableRetry: !config.Retry.Enabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
MaxRetries: config.Retry.MaxRetries,
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),

// configure sniffing
Expand Down
10 changes: 3 additions & 7 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,10 @@ func TestExporterLogs(t *testing.T) {

t.Run("no retry", func(t *testing.T) {
configurations := map[string]func(*Config){
"max_requests limited": func(cfg *Config) {
cfg.Retry.MaxRequests = 1
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
"retry.enabled is false": func(cfg *Config) {
cfg.Retry.Enabled = false
cfg.Retry.MaxRequests = 10
cfg.Retry.RetryOnStatus = []int{429}
cfg.Retry.MaxRetries = 10
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
Expand All @@ -366,7 +362,7 @@ func TestExporterLogs(t *testing.T) {
"fail http request": func(attempts *atomic.Int64) bulkHandler {
return func([]itemRequest) ([]itemResponse, error) {
attempts.Add(1)
return nil, &httpTestError{message: "oops"}
return nil, &httpTestError{message: "oops", status: 429}
}
},
"fail item": func(attempts *atomic.Int64) bulkHandler {
Expand Down
15 changes: 10 additions & 5 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 3,
MaxRetries: 2,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
Expand Down Expand Up @@ -110,7 +110,9 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
index = cf.Index
}
logConfigDeprecationWarnings(cf, set.Logger)
if err := handleDeprecatedConfig(cf, set.Logger); err != nil {
return nil, err
}

exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled)

Expand All @@ -129,8 +131,9 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)

if err := handleDeprecatedConfig(cf, set.Logger); err != nil {
return nil, err
}
exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)

return exporterhelper.NewMetricsExporter(
Expand All @@ -147,7 +150,9 @@ func createTracesExporter(ctx context.Context,
cfg component.Config,
) (exporter.Traces, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)
if err := handleDeprecatedConfig(cf, set.Logger); err != nil {
return nil, err
}

exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)

Expand Down
19 changes: 19 additions & 0 deletions exporter/elasticsearchexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,22 @@ func TestFactory_DedotDeprecated(t *testing.T) {
assert.Equal(t, "dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only", record.Message)
}
}

func TestFactory_MaxRetries(t *testing.T) {
set := exportertest.NewNopSettings()

cfg := withDefaultConfig(func(cfg *Config) {
cfg.Retry.MaxRetries = 1
cfg.Retry.MaxRequests = 1
})

factory := NewFactory()
_, err := factory.CreateLogsExporter(context.Background(), set, cfg)
require.ErrorContains(t, err, "should specify at most one of retry::max_requests and retry::max_retries")

_, err = factory.CreateTracesExporter(context.Background(), set, cfg)
require.ErrorContains(t, err, "should specify at most one of retry::max_requests and retry::max_retries")

_, err = factory.CreateMetricsExporter(context.Background(), set, cfg)
require.ErrorContains(t, err, "should specify at most one of retry::max_requests and retry::max_retries")
}
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ elasticsearch/trace:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -38,7 +38,7 @@ elasticsearch/metric:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -61,7 +61,7 @@ elasticsearch/log:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand Down
Loading