Skip to content

Commit

Permalink
Merge branch 'main' into concurrent_collect_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Jun 29, 2023
2 parents 4b5714d + 64e76f8 commit 914a931
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 3 deletions.
1 change: 1 addition & 0 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (mr *ManualReader) Shutdown(context.Context) error {
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
// Collect will return an error if the context's Done channel is closed.
//
// This method is safe to call concurrently.
func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
Expand Down
50 changes: 50 additions & 0 deletions sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -71,3 +74,50 @@ func TestManualReaderTemporality(t *testing.T) {
})
}
}

func TestManualReaderCollect(t *testing.T) {
expiredCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1))
defer cancel()

tests := []struct {
name string

ctx context.Context
resourceMetrics *metricdata.ResourceMetrics

expectedErr error
}{
{
name: "with a valid context",

ctx: context.Background(),
resourceMetrics: &metricdata.ResourceMetrics{},
},
{
name: "with an expired context",

ctx: expiredCtx,
resourceMetrics: &metricdata.ResourceMetrics{},

expectedErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader()
mp := NewMeterProvider(WithReader(rdr))
meter := mp.Meter("test")

// Ensure the pipeline has a callback setup
testM, err := meter.Int64ObservableCounter("test")
assert.NoError(t, err)
_, err = meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
return nil
}, testM)
assert.NoError(t, err)

assert.Equal(t, tt.expectedErr, rdr.Collect(tt.ctx, tt.resourceMetrics))
})
}
}
6 changes: 4 additions & 2 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
// data is not exported to the configured exporter, it is left to the caller to
// handle that if desired.
//
// An error is returned if this is called after Shutdown, if rm is nil or if
// the duration of the collect and export exceeded the timeout.
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
// Collect will return an error if the context's Done channel is closed.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
Expand Down
48 changes: 48 additions & 0 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -377,3 +378,50 @@ func TestPeriodiclReaderTemporality(t *testing.T) {
})
}
}

func TestPeriodicReaderCollect(t *testing.T) {
expiredCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1))
defer cancel()

tests := []struct {
name string

ctx context.Context
resourceMetrics *metricdata.ResourceMetrics

expectedErr error
}{
{
name: "with a valid context",

ctx: context.Background(),
resourceMetrics: &metricdata.ResourceMetrics{},
},
{
name: "with an expired context",

ctx: expiredCtx,
resourceMetrics: &metricdata.ResourceMetrics{},

expectedErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rdr := NewPeriodicReader(new(fnExporter))
mp := NewMeterProvider(WithReader(rdr))
meter := mp.Meter("test")

// Ensure the pipeline has a callback setup
testM, err := meter.Int64ObservableCounter("test")
assert.NoError(t, err)
_, err = meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
return nil
}, testM)
assert.NoError(t, err)

assert.Equal(t, tt.expectedErr, rdr.Collect(tt.ctx, tt.resourceMetrics))
})
}
}
3 changes: 2 additions & 1 deletion sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type Reader interface {
// the SDK and stores it in out. An error is returned if this is called
// after Shutdown or if out is nil.
//
// This method needs to be concurrent safe.
// This method needs to be concurrent safe, and the cancelation of the
// passed context is expected to be honored.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
Expand Down

0 comments on commit 914a931

Please sign in to comment.