Skip to content

Commit

Permalink
internal/stanza: Avoid writing test/mock consumers when not necessary (
Browse files Browse the repository at this point in the history
…#9802)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 9, 2022
1 parent 10612c4 commit 65eace3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 56 deletions.
21 changes: 11 additions & 10 deletions internal/stanza/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,8 +88,8 @@ func BenchmarkEmitterToConsumer(b *testing.B) {

for _, wc := range workerCounts {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
consumer := &mockLogsConsumer{}
logsReceiver, err := createNoopReceiver(wc, consumer)
cl := &consumertest.LogsSink{}
logsReceiver, err := createNoopReceiver(wc, cl)
require.NoError(b, err)

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -97,7 +98,7 @@ func BenchmarkEmitterToConsumer(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
consumer.ResetReceivedCount()
cl.Reset()

go func() {
ctx := context.Background()
Expand All @@ -108,9 +109,9 @@ func BenchmarkEmitterToConsumer(b *testing.B) {

require.Eventually(b,
func() bool {
return consumer.Received() == entryCount
return cl.LogRecordCount() == entryCount
},
30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(),
30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(),
)
}
})
Expand All @@ -126,8 +127,8 @@ func TestEmitterToConsumer(t *testing.T) {

entries := complexEntriesForNDifferentHosts(entryCount, hostsCount)

consumer := &mockLogsConsumer{}
logsReceiver, err := createNoopReceiver(workerCount, consumer)
cl := &consumertest.LogsSink{}
logsReceiver, err := createNoopReceiver(workerCount, cl)
require.NoError(t, err)

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -142,13 +143,13 @@ func TestEmitterToConsumer(t *testing.T) {

require.Eventually(t,
func() bool {
return consumer.Received() == entryCount
return cl.LogRecordCount() == entryCount
},
5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(),
5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(),
)

// Wait for a small bit of time in order to let any potential extra entries drain out of the pipeline
<-time.After(500 * time.Millisecond)

require.Equal(t, entryCount, consumer.Received())
require.Equal(t, entryCount, cl.LogRecordCount())
}
38 changes: 3 additions & 35 deletions internal/stanza/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
"github.com/open-telemetry/opentelemetry-log-collection/operator/transformer/noop"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,46 +76,15 @@ func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) e
return nil
}

type mockLogsConsumer struct {
received int32
}

func (m *mockLogsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
atomic.AddInt32(&m.received, int32(ld.LogRecordCount()))
return nil
}

func (m *mockLogsConsumer) Received() int {
ret := atomic.LoadInt32(&m.received)
return int(ret)
}

func (m *mockLogsConsumer) ResetReceivedCount() {
atomic.StoreInt32(&m.received, 0)
}

type mockLogsRejecter struct {
rejected int32
}

func (m *mockLogsRejecter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
consumertest.LogsSink
}

func (m *mockLogsRejecter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
atomic.AddInt32(&m.rejected, 1)
_ = m.LogsSink.ConsumeLogs(ctx, ld)
return fmt.Errorf("no")
}

func (m *mockLogsRejecter) Rejected() int {
ret := atomic.LoadInt32(&m.rejected)
return int(ret)
}

const testType = "test"

type TestConfig struct {
Expand Down
19 changes: 10 additions & 9 deletions internal/stanza/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
)

func TestStart(t *testing.T) {
mockConsumer := mockLogsConsumer{}
mockConsumer := &consumertest.LogsSink{}

factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(
context.Background(),
componenttest.NewNopReceiverCreateSettings(),
factory.CreateDefaultConfig(),
&mockConsumer,
mockConsumer,
)
require.NoError(t, err, "receiver should successfully build")

Expand All @@ -55,33 +56,33 @@ func TestStart(t *testing.T) {
// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
func() bool {
return mockConsumer.Received() == 1
return mockConsumer.LogRecordCount() == 1
},
10*time.Second, 5*time.Millisecond, "one log entry expected",
)
logsReceiver.Shutdown(context.Background())
}

func TestHandleStartError(t *testing.T) {
mockConsumer := mockLogsConsumer{}
mockConsumer := &consumertest.LogsSink{}

factory := NewFactory(TestReceiverType{})

cfg := factory.CreateDefaultConfig().(*TestConfig)
cfg.Input = newUnstartableParams()

receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, &mockConsumer)
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, mockConsumer)
require.NoError(t, err, "receiver should successfully build")

err = receiver.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err, "receiver fails to start under rare circumstances")
}

func TestHandleConsumeError(t *testing.T) {
mockConsumer := mockLogsRejecter{}
mockConsumer := &mockLogsRejecter{}
factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), &mockConsumer)
logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), mockConsumer)
require.NoError(t, err, "receiver should successfully build")

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -93,11 +94,11 @@ func TestHandleConsumeError(t *testing.T) {
// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
func() bool {
return mockConsumer.Rejected() == 1
return mockConsumer.LogRecordCount() == 1
},
10*time.Second, 5*time.Millisecond, "one log entry expected",
)
logsReceiver.Shutdown(context.Background())
require.NoError(t, logsReceiver.Shutdown(context.Background()))
}

func BenchmarkReadLine(b *testing.B) {
Expand Down
4 changes: 2 additions & 2 deletions internal/stanza/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/otel/metric/nonrecording"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -98,15 +99,14 @@ func createReceiver(t *testing.T) *receiver {
MetricsLevel: configtelemetry.LevelNone,
},
}
mockConsumer := mockLogsConsumer{}

factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(
context.Background(),
params,
factory.CreateDefaultConfig(),
&mockConsumer,
consumertest.NewNop(),
)
require.NoError(t, err, "receiver should successfully build")

Expand Down

0 comments on commit 65eace3

Please sign in to comment.