Skip to content

Commit

Permalink
Remove SetConsumerError from sink helpers (#2579)
Browse files Browse the repository at this point in the history
Fixes open-telemetry/opentelemetry-collector#2477

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Mar 8, 2021
1 parent 44a8a09 commit 98a0672
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 63 deletions.
29 changes: 3 additions & 26 deletions consumer/consumertest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,10 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

type baseErrorConsumer struct {
mu sync.Mutex
consumeError error // to be returned by ConsumeTraces, if set
}

// SetConsumeError sets an error that will be returned by the Consume function.
// TODO: Remove this when all calls are switched to the new ErrConsumer.
func (bec *baseErrorConsumer) SetConsumeError(err error) {
bec.mu.Lock()
defer bec.mu.Unlock()
bec.consumeError = err
}

// TracesSink is a consumer.TracesConsumer that acts like a sink that
// stores all traces and allows querying them for testing.
type TracesSink struct {
baseErrorConsumer
mu sync.Mutex
traces []pdata.Traces
spansCount int
}
Expand All @@ -50,10 +37,6 @@ func (ste *TracesSink) ConsumeTraces(_ context.Context, td pdata.Traces) error {
ste.mu.Lock()
defer ste.mu.Unlock()

if ste.consumeError != nil {
return ste.consumeError
}

ste.traces = append(ste.traces, td)
ste.spansCount += td.SpanCount()

Expand Down Expand Up @@ -89,7 +72,7 @@ func (ste *TracesSink) Reset() {
// MetricsSink is a consumer.MetricsConsumer that acts like a sink that
// stores all metrics and allows querying them for testing.
type MetricsSink struct {
baseErrorConsumer
mu sync.Mutex
metrics []pdata.Metrics
metricsCount int
}
Expand All @@ -100,9 +83,6 @@ var _ consumer.MetricsConsumer = (*MetricsSink)(nil)
func (sme *MetricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
sme.mu.Lock()
defer sme.mu.Unlock()
if sme.consumeError != nil {
return sme.consumeError
}

sme.metrics = append(sme.metrics, md)
sme.metricsCount += md.MetricCount()
Expand Down Expand Up @@ -139,7 +119,7 @@ func (sme *MetricsSink) Reset() {
// LogsSink is a consumer.LogsConsumer that acts like a sink that
// stores all logs and allows querying them for testing.
type LogsSink struct {
baseErrorConsumer
mu sync.Mutex
logs []pdata.Logs
logRecordsCount int
}
Expand All @@ -150,9 +130,6 @@ var _ consumer.LogsConsumer = (*LogsSink)(nil)
func (sle *LogsSink) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
sle.mu.Lock()
defer sle.mu.Unlock()
if sle.consumeError != nil {
return sle.consumeError
}

sle.logs = append(sle.logs, ld)
sle.logRecordsCount += ld.LogRecordCount()
Expand Down
28 changes: 0 additions & 28 deletions consumer/consumertest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package consumertest

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -41,15 +40,6 @@ func TestTracesSink(t *testing.T) {
assert.Equal(t, 0, sink.SpansCount())
}

func TestTracesSink_Error(t *testing.T) {
sink := new(TracesSink)
sink.SetConsumeError(errors.New("my error"))
td := testdata.GenerateTraceDataOneSpan()
require.Error(t, sink.ConsumeTraces(context.Background(), td))
assert.Len(t, sink.AllTraces(), 0)
assert.Equal(t, 0, sink.SpansCount())
}

func TestMetricsSink(t *testing.T) {
sink := new(MetricsSink)
md := testdata.GenerateMetricsOneMetric()
Expand All @@ -65,15 +55,6 @@ func TestMetricsSink(t *testing.T) {
assert.Equal(t, 0, sink.MetricsCount())
}

func TestMetricsSink_Error(t *testing.T) {
sink := new(MetricsSink)
sink.SetConsumeError(errors.New("my error"))
md := testdata.GenerateMetricsOneMetric()
require.Error(t, sink.ConsumeMetrics(context.Background(), md))
assert.Len(t, sink.AllMetrics(), 0)
assert.Equal(t, 0, sink.MetricsCount())
}

func TestLogsSink(t *testing.T) {
sink := new(LogsSink)
md := testdata.GenerateLogDataOneLogNoResource()
Expand All @@ -88,12 +69,3 @@ func TestLogsSink(t *testing.T) {
assert.Equal(t, 0, len(sink.AllLogs()))
assert.Equal(t, 0, sink.LogRecordsCount())
}

func TestLogsSink_Error(t *testing.T) {
sink := new(LogsSink)
sink.SetConsumeError(errors.New("my error"))
ld := testdata.GenerateLogDataOneLogNoResource()
require.Error(t, sink.ConsumeLogs(context.Background(), ld))
assert.Len(t, sink.AllLogs(), 0)
assert.Equal(t, 0, sink.LogRecordsCount())
}
75 changes: 75 additions & 0 deletions internal/internalconsumertest/err_or_sink_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internalconsumertest

import (
"context"
"sync"

"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
)

type ErrOrSinkConsumer struct {
*consumertest.TracesSink
*consumertest.MetricsSink
mu sync.Mutex
consumeError error // to be returned by ConsumeTraces, if set
}

// SetConsumeError sets an error that will be returned by the Consume function.
func (esc *ErrOrSinkConsumer) SetConsumeError(err error) {
esc.mu.Lock()
defer esc.mu.Unlock()
esc.consumeError = err
}

// ConsumeTraces stores traces to this sink.
func (esc *ErrOrSinkConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
esc.mu.Lock()
defer esc.mu.Unlock()

if esc.consumeError != nil {
return esc.consumeError
}

return esc.TracesSink.ConsumeTraces(ctx, td)
}

// ConsumeTraces stores traces to this sink.
func (esc *ErrOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
esc.mu.Lock()
defer esc.mu.Unlock()

if esc.consumeError != nil {
return esc.consumeError
}

return esc.MetricsSink.ConsumeMetrics(ctx, md)
}

// Reset deletes any stored in the sinks, resets error to nil.
func (esc *ErrOrSinkConsumer) Reset() {
esc.mu.Lock()
defer esc.mu.Unlock()

esc.consumeError = nil
if esc.TracesSink != nil {
esc.TracesSink.Reset()
}
if esc.MetricsSink != nil {
esc.MetricsSink.Reset()
}
}
5 changes: 3 additions & 2 deletions receiver/opencensusreceiver/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/internalconsumertest"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/testutil"
"go.opentelemetry.io/collector/translator/internaldata"
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
require.NoError(t, err)
defer doneFn()

sink := new(consumertest.TracesSink)
sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

var opts []ocOption
ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...)
Expand Down Expand Up @@ -575,7 +576,7 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) {
require.NoError(t, err)
defer doneFn()

sink := new(consumertest.MetricsSink)
sink := &internalconsumertest.ErrOrSinkConsumer{MetricsSink: new(consumertest.MetricsSink)}

var opts []ocOption
ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...)
Expand Down
14 changes: 7 additions & 7 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1"
otlpresource "go.opentelemetry.io/collector/internal/data/protogen/resource/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/internal/internalconsumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/testutil"
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestJsonHttp(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(consumertest.TracesSink)
sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, sink, nil)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
Expand All @@ -183,7 +184,7 @@ func TestJsonHttp(t *testing.T) {
}
}

func testHTTPJSONRequest(t *testing.T, url string, sink *consumertest.TracesSink, encoding string, expectedErr error) {
func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.ErrOrSinkConsumer, encoding string, expectedErr error) {
var buf *bytes.Buffer
var err error
switch encoding {
Expand Down Expand Up @@ -334,9 +335,8 @@ func TestProtoHttp(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
tSink := new(consumertest.TracesSink)
mSink := new(consumertest.MetricsSink)
ocr := newHTTPReceiver(t, addr, tSink, mSink)
tSink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, tSink, consumertest.NewMetricsNop())

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
defer ocr.Shutdown(context.Background())
Expand Down Expand Up @@ -396,7 +396,7 @@ func createHTTPProtobufRequest(
func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
tSink *internalconsumertest.ErrOrSinkConsumer,
encoding string,
traceBytes []byte,
expectedErr error,
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
require.NoError(t, err)
defer doneFn()

sink := new(consumertest.TracesSink)
sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil)
require.NotNil(t, ocr)
Expand Down

0 comments on commit 98a0672

Please sign in to comment.