Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/resourcedetection] introduce retry mechanism for detectors #37506

Merged
merged 30 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
697f47c
[processor/resourcedetection] introduce retry mechanism for detectors
odubajDT Jan 27, 2025
664d597
working version
odubajDT Jan 27, 2025
5b0c44b
disable non-working tests
odubajDT Jan 27, 2025
8edbf9d
testing
odubajDT Jan 27, 2025
ec16ba2
fix
odubajDT Jan 27, 2025
2c3caa6
blocking/non-blocking version
odubajDT Jan 28, 2025
a3d7b69
unit tests
odubajDT Jan 28, 2025
7244e39
integration test
odubajDT Jan 28, 2025
0b7073a
fix test + polishing
odubajDT Jan 28, 2025
b5a70ae
fix readme
odubajDT Feb 12, 2025
9aa627c
async_detection
odubajDT Feb 13, 2025
50b3221
fix test
odubajDT Feb 13, 2025
39f2327
fix test
odubajDT Feb 13, 2025
f891f09
Merge branch 'main' into docker-resource-failure
odubajDT Feb 19, 2025
5f2fbc6
fix lint
odubajDT Feb 19, 2025
9f22084
Merge branch 'main' into docker-resource-failure
odubajDT Feb 19, 2025
3b4bece
remove asunc_detection
odubajDT Feb 20, 2025
d4f0da9
polish tests
odubajDT Feb 20, 2025
57fc92b
adapt readme
odubajDT Feb 20, 2025
f161d05
Merge branch 'main' into docker-resource-failure
odubajDT Feb 20, 2025
30b5327
Merge branch 'main' into docker-resource-failure
odubajDT Feb 21, 2025
c89319f
fix after rebase
odubajDT Feb 21, 2025
527e4a8
adapt according to PR review
odubajDT Feb 24, 2025
498b6ad
Merge branch 'main' into docker-resource-failure
odubajDT Feb 24, 2025
40099b6
add error propagation back
odubajDT Feb 24, 2025
cbdc42f
polish
odubajDT Feb 24, 2025
70a67d8
introduce lib for exponential backoff
odubajDT Feb 25, 2025
e7d7706
Merge branch 'main' into docker-resource-failure
odubajDT Feb 25, 2025
d9d7398
Merge branch 'main' into docker-resource-failure
odubajDT Feb 25, 2025
9d44ca7
go mod tidy
odubajDT Feb 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/docker-resource-failure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/resourcedetection

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Introduce retry logic for failed resource detection."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34761]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions connector/datadogconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ require (
github.com/bmatcuk/doublestar/v4 v4.8.1 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/datadogexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/datadogexporter/integrationtest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions processor/resourcedetectionprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.29.7
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.29
github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1
github.com/cenkalti/backoff/v5 v5.0.2
github.com/google/go-cmp v0.6.0
github.com/hashicorp/consul/api v1.31.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.120.1
Expand Down
2 changes: 2 additions & 0 deletions processor/resourcedetectionprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

backoff "github.com/cenkalti/backoff/v5"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -120,31 +122,64 @@ func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resour
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)
p.detectResource(ctx, client.Timeout)
})

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}

func (p *ResourceProvider) detectResource(ctx context.Context) {
func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Duration) {
p.detectedResource = &resourceResult{}

res := pcommon.NewResource()
mergedSchemaURL := ""

p.logger.Info("began detecting resource information")

for _, detector := range p.detectors {
r, schemaURL, err := detector.Detect(ctx)
if err != nil {
p.logger.Warn("failed to detect resource", zap.Error(err))
resultsChan := make([]chan resourceResult, len(p.detectors))
for i, detector := range p.detectors {
resultsChan[i] = make(chan resourceResult)
go func(detector Detector) {
sleep := backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 1.5,
Multiplier: 2,
MaxInterval: timeout,
}
sleep.Reset()
var err error
var r pcommon.Resource
var schemaURL string
for {
r, schemaURL, err = detector.Detect(ctx)
if err == nil {
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: nil}
return
}
p.logger.Warn("failed to detect resource", zap.Error(err))

timer := time.NewTimer(sleep.NextBackOff())
select {
case <-timer.C:
fmt.Println("Retrying fetching data...")
case <-ctx.Done():
p.logger.Warn("Context was cancelled: %w", zap.Error(ctx.Err()))
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: err}
return
}
}
}(detector)
}

for _, ch := range resultsChan {
result := <-ch
if result.err != nil {
if allowErrorPropagationFeatureGate.IsEnabled() {
p.detectedResource.err = err
return
p.detectedResource.err = errors.Join(p.detectedResource.err, result.err)
}
} else {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
MergeResource(res, r, false)
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.resource, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDetect(t *testing.T) {
p, err := f.CreateResourceProvider(processortest.NewNopSettings(metadata.Type), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...)
require.NoError(t, err)

got, _, err := p.Get(context.Background(), http.DefaultClient)
got, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
require.NoError(t, err)

assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw())
Expand All @@ -133,38 +133,46 @@ func TestDetectResource_DetectorFactoryError(t *testing.T) {
require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed"))
}

func TestDetectResource_Error(t *testing.T) {
md1 := &MockDetector{}
res := pcommon.NewResource()
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(res, nil)

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
_, _, err := p.Get(context.Background(), http.DefaultClient)
require.NoError(t, err)
}

func TestDetectResource_Error_PropagationEnabled(t *testing.T) {
func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) {
err := featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), false)
}()

md1 := &MockDetector{}
res := pcommon.NewResource()
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(res, nil)
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
_, _, err = p.Get(context.Background(), http.DefaultClient)

var cancel context.CancelFunc
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()

_, _, err = p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
require.Error(t, err)
require.Contains(t, err.Error(), "err1")
require.Contains(t, err.Error(), "err2")
}

func TestDetectResource_Error_ContextDeadline_WithoutErrPropagation(t *testing.T) {
md1 := &MockDetector{}
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

var cancel context.CancelFunc
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()

_, _, err := p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
require.NoError(t, err)
}

func TestMergeResource(t *testing.T) {
Expand Down Expand Up @@ -230,20 +238,17 @@ func TestDetectResource_Parallel(t *testing.T) {
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"}))
md2.On("Detect").Return(res2, nil)

md3 := NewMockParallelDetector()
md3.On("Detect").Return(pcommon.NewResource(), errors.New("an error"))

expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3)
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

// call p.Get multiple times
wg := &sync.WaitGroup{}
wg.Add(iterations)
for i := 0; i < iterations; i++ {
go func() {
defer wg.Done()
detected, _, err := p.Get(context.Background(), http.DefaultClient)
detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
assert.NoError(t, err)
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())
}()
Expand All @@ -255,13 +260,36 @@ func TestDetectResource_Parallel(t *testing.T) {
// detector.Detect should only be called once, so we only need to notify each channel once
md1.ch <- struct{}{}
md2.ch <- struct{}{}
md3.ch <- struct{}{}

// then wait until all goroutines are finished, and ensure p.Detect was only called once
wg.Wait()
md1.AssertNumberOfCalls(t, "Detect", 1)
md2.AssertNumberOfCalls(t, "Detect", 1)
md3.AssertNumberOfCalls(t, "Detect", 1)
}

func TestDetectResource_Reconnect(t *testing.T) {
md1 := &MockDetector{}
res1 := pcommon.NewResource()
require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(pcommon.NewResource(), errors.New("connection error1")).Twice()
md1.On("Detect").Return(res1, nil)

md2 := &MockDetector{}
res2 := pcommon.NewResource()
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"c": "3"}))
md2.On("Detect").Return(pcommon.NewResource(), errors.New("connection error2")).Once()
md2.On("Detect").Return(res2, nil)

expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 15 * time.Second})
assert.NoError(t, err)
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())

md1.AssertNumberOfCalls(t, "Detect", 3) // 2 errors + 1 success
md2.AssertNumberOfCalls(t, "Detect", 2) // 1 error + 1 success
}

func TestFilterAttributes_Match(t *testing.T) {
Expand Down
Loading