Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
omrozowicz-splunk committed Sep 29, 2023
1 parent 904a6b6 commit a8ec112
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 48 deletions.
19 changes: 9 additions & 10 deletions exporter/exportertest/contract_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package exportertest // import "go.opentelemetry.io/collector/exporter/exportert
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -80,7 +80,8 @@ func CheckConsumeContract(params CheckConsumeContractParams) {
}

func checkConsumeContractScenario(params CheckConsumeContractParams, decisionFunc func() error, checkIfTestPassed func(*testing.T, int, RequestCounter)) {
rcv := params.MockReceiverFactory(decisionFunc)
mockConsumerInstance := NewMockConsumer(decisionFunc)
rcv := params.MockReceiverFactory(mockConsumerInstance)
switch params.DataType {
case component.DataTypeLogs:
checkLogs(params, rcv, checkIfTestPassed)
Expand Down Expand Up @@ -116,7 +117,7 @@ func checkMetrics(params CheckConsumeContractParams, mockReceiver MockReceiver,
for i := 0; i < params.NumberOfTestElements; i++ {
id := UniqueIDAttrVal(strconv.Itoa(i))
fmt.Println("Preparing metric number: ", id)
data := CreateOneMetricWithID(id)
data := createOneMetricWithID(id)

err = exp.ConsumeMetrics(ctx, data)
}
Expand Down Expand Up @@ -157,12 +158,11 @@ func checkTraces(params CheckConsumeContractParams, mockReceiver MockReceiver, c
for i := 0; i < params.NumberOfTestElements; i++ {
id := UniqueIDAttrVal(strconv.Itoa(i))
fmt.Println("Preparing trace number: ", id)
data := CreateOneTraceWithID(id)
data := createOneTraceWithID(id)

err = exp.ConsumeTraces(ctx, data)
}

fmt.Println("Before request Counter")
reqCounter := mockReceiver.RequestCounter()
// The overall number of requests sent by exporter
fmt.Printf("Number of export tries: %d\n", reqCounter.total)
Expand All @@ -171,7 +171,6 @@ func checkTraces(params CheckConsumeContractParams, mockReceiver MockReceiver, c
// Number of errors that happened
fmt.Printf("Number of permanent errors: %d\n", reqCounter.error.permanent)
fmt.Printf("Number of non-permanent errors: %d\n", reqCounter.error.nonpermanent)
//checkIfTestPassed(params.T, params.NumberOfTestElements, reqCounter)

assert.EventuallyWithT(params.T, func(c *assert.CollectT) {
checkIfTestPassed(params.T, params.NumberOfTestElements, reqCounter)
Expand Down Expand Up @@ -200,7 +199,7 @@ func checkLogs(params CheckConsumeContractParams, mockReceiver MockReceiver, che
for i := 0; i < params.NumberOfTestElements; i++ {
id := UniqueIDAttrVal(strconv.Itoa(i))
fmt.Println("Preparing log number: ", id)
data := CreateOneLogWithID(id)
data := createOneLogWithID(id)

err = exp.ConsumeLogs(ctx, data)
}
Expand Down Expand Up @@ -248,7 +247,7 @@ func randomErrorConsumeDecisionPassed(t *testing.T, allRecordsNumber int, reqCou
require.Equal(t, reqCounter.total, allRecordsNumber+reqCounter.error.nonpermanent)
}

func CreateOneLogWithID(id UniqueIDAttrVal) plog.Logs {
func createOneLogWithID(id UniqueIDAttrVal) plog.Logs {
data := plog.NewLogs()
data.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
Expand All @@ -257,7 +256,7 @@ func CreateOneLogWithID(id UniqueIDAttrVal) plog.Logs {
return data
}

func CreateOneTraceWithID(id UniqueIDAttrVal) ptrace.Traces {
func createOneTraceWithID(id UniqueIDAttrVal) ptrace.Traces {
data := ptrace.NewTraces()
data.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
Expand All @@ -266,7 +265,7 @@ func CreateOneTraceWithID(id UniqueIDAttrVal) ptrace.Traces {
return data
}

func CreateOneMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
func createOneMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().Attributes().PutStr(UniqueIDAttrName, string(id))
return data
Expand Down
39 changes: 18 additions & 21 deletions exporter/exportertest/mock_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"math/rand"
"sync"

"go.opentelemetry.io/collector/consumer"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,15 +22,13 @@ import (
var errNonPermanent = status.Error(codes.DeadlineExceeded, "non Permanent error")
var errPermanent = status.Error(codes.Internal, "Permanent error")

type DecisionFunc func() error

type MockReceiver interface {
Start() error
Stop() error
RequestCounter() RequestCounter
}

type MockReceiverFactory func(DecisionFunc) MockReceiver
type MockReceiverFactory func(consumer MockConsumer) MockReceiver

// // randomNonPermanentErrorConsumeDecision is a decision function that succeeds approximately
// // half of the time and fails with a non-permanent error the rest of the time.
Expand Down Expand Up @@ -66,14 +63,16 @@ func randomErrorsConsumeDecision() error {
return nil
}

type BaseMockConsumer interface {
ConsumeLogs(_ context.Context, ld plog.Logs)
ConsumeMetrics(_ context.Context, md pmetric.Metrics)
ConsumeTraces(_ context.Context, td ptrace.Traces)
type MockConsumer interface {
ConsumeLogs(_ context.Context, ld plog.Logs) error
ConsumeMetrics(_ context.Context, md pmetric.Metrics) error
ConsumeTraces(_ context.Context, td ptrace.Traces) error
Capabilities() consumer.Capabilities
Clear()
RequestCounter() RequestCounter
}

type MockConsumer struct {
type mockConsumer struct {
reqCounter RequestCounter
mux sync.Mutex
exportErrorFunction func() error
Expand All @@ -82,10 +81,8 @@ type MockConsumer struct {
ReceivedLogs []plog.Logs
}

type MockConsumerFactory func() MockConsumer

func CreateDefaultConsumer(decisionFunc DecisionFunc) MockConsumer {
return MockConsumer{
func NewMockConsumer(decisionFunc func() error) MockConsumer {
return &mockConsumer{
reqCounter: newRequestCounter(),
mux: sync.Mutex{},
exportErrorFunction: decisionFunc,
Expand Down Expand Up @@ -121,7 +118,7 @@ func newRequestCounter() RequestCounter {
}
}

func (r *MockConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
func (r *mockConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
r.mux.Lock()
defer r.mux.Unlock()
r.reqCounter.total++
Expand All @@ -137,7 +134,7 @@ func (r *MockConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
return nil
}

func (r *MockConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
func (r *mockConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
r.mux.Lock()
defer r.mux.Unlock()
r.reqCounter.total++
Expand All @@ -153,7 +150,7 @@ func (r *MockConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error
return nil
}

func (r *MockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
func (r *mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
r.mux.Lock()
defer r.mux.Unlock()
r.reqCounter.total++
Expand All @@ -169,11 +166,11 @@ func (r *MockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) err
return nil
}

func (r *MockConsumer) Capabilities() consumer.Capabilities {
func (r *mockConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (r *MockConsumer) processError(err error, dataType string, idOfElement string) {
func (r *mockConsumer) processError(err error, dataType string, idOfElement string) {
if consumererror.IsPermanent(err) {
fmt.Println("permanent error happened")
fmt.Printf("Dropping %s number: %s\n", dataType, idOfElement)
Expand All @@ -185,13 +182,13 @@ func (r *MockConsumer) processError(err error, dataType string, idOfElement stri
}
}

func (r *MockConsumer) Clear() {
func (r *mockConsumer) Clear() {
r.mux.Lock()
defer r.mux.Unlock()
r.reqCounter = newRequestCounter()
}

func (r *MockConsumer) RequestCounter() RequestCounter {
func (r *mockConsumer) RequestCounter() RequestCounter {
return r.reqCounter
}

Expand Down
10 changes: 3 additions & 7 deletions exporter/otlpexporter/consume_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
package otlpexporter

import (
"fmt"
"testing"
"time"

"go.opentelemetry.io/collector/config/confignet"

"github.com/cenkalti/backoff/v4"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -38,17 +35,16 @@ func testConfig() component.Config {
QueueSettings: exporterhelper.QueueSettings{Enabled: false},
RetrySettings: newTestRetrySettings(),
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: confignet.NetAddr{Endpoint: fmt.Sprintf("0.0.0.0:4317"), Transport: "tcp"}.Endpoint,
Endpoint: "0.0.0.0:4317",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
}},
}
}

// Define a function that matches the MockReceiverFactory signature
func createMockOtlpReceiver(decisionFunc exportertest.DecisionFunc) exportertest.MockReceiver {
mockConsumer := exportertest.CreateDefaultConsumer(decisionFunc)
rcv := newOTLPDataReceiver(&mockConsumer)
func createMockOtlpReceiver(mockConsumer exportertest.MockConsumer) exportertest.MockReceiver {
rcv := newOTLPDataReceiver(mockConsumer)
err := rcv.Start()
if err != nil {
return nil
Expand Down
16 changes: 6 additions & 10 deletions exporter/otlpexporter/mock_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexpor

import (
"context"
"errors"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -13,13 +14,13 @@ import (
)

type mockOtlpReceiver struct {
mockConsumer *exportertest.MockConsumer
mockConsumer exportertest.MockConsumer
traceReceiver receiver.Traces
metricsReceiver receiver.Metrics
logReceiver receiver.Logs
}

func newOTLPDataReceiver(mockConsumer *exportertest.MockConsumer) *mockOtlpReceiver {
func newOTLPDataReceiver(mockConsumer exportertest.MockConsumer) *mockOtlpReceiver {
return &mockOtlpReceiver{
mockConsumer: mockConsumer,
}
Expand All @@ -28,7 +29,6 @@ func newOTLPDataReceiver(mockConsumer *exportertest.MockConsumer) *mockOtlpRecei
func (bor *mockOtlpReceiver) Start() error {
factory := otlpreceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config)
//cfg.GRPC.NetAddr = confignet.NetAddr{Endpoint: fmt.Sprintf("127.0.0.1:%d", bor.port), Transport: "tcp"}
cfg.HTTP = nil
var err error
set := receivertest.NewNopCreateSettings()
Expand All @@ -53,13 +53,9 @@ func (bor *mockOtlpReceiver) Start() error {

func (bor *mockOtlpReceiver) Stop() error {
bor.mockConsumer.Clear()
if err := bor.traceReceiver.Shutdown(context.Background()); err != nil {
return err
}
if err := bor.metricsReceiver.Shutdown(context.Background()); err != nil {
return err
}
return bor.logReceiver.Shutdown(context.Background())
err := bor.traceReceiver.Shutdown(context.Background())
err = errors.Join(err, bor.metricsReceiver.Shutdown(context.Background()))
return errors.Join(err, bor.logReceiver.Shutdown(context.Background()))
}

func (bor *mockOtlpReceiver) RequestCounter() exportertest.RequestCounter {
Expand Down

0 comments on commit a8ec112

Please sign in to comment.