Skip to content

Commit

Permalink
Make report*EnqueueFailure methods private
Browse files Browse the repository at this point in the history
By moving them to the package where they are being used. It requires some code duplication
  • Loading branch information
dmitryax committed Jun 2, 2021
1 parent 69215b9 commit 2bb405a
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 70 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
obsrep *obsreport.Exporter
obsrep *obsExporter
sender requestSender
qrSender *queuedRetrySender
}
Expand All @@ -176,7 +176,7 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings)
Component: componenthelper.New(bs.componentOptions...),
}

be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{
be.obsrep = newObsExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
})
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)

type logsRequest struct {
Expand Down Expand Up @@ -96,7 +95,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count())
be.obsrep.recordLogsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)
Expand All @@ -108,7 +107,7 @@ func NewLogsExporter(
}

type logsExporterWithObservability struct {
obsrep *obsreport.Exporter
obsrep *obsExporter
nextSender requestSender
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15))
checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15))
}

func TestLogsExporter_WithSpan(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)

type metricsRequest struct {
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewMetricsExporter(
req := newMetricsRequest(ctx, md, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count())
be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)
Expand All @@ -112,7 +111,7 @@ func NewMetricsExporter(
}

type metricsSenderWithObservability struct {
obsrep *obsreport.Exporter
obsrep *obsExporter
nextSender requestSender
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 metric points rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5))
checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
Expand Down
57 changes: 57 additions & 0 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/obsreport"
)

// TODO: Incorporate this functionality along with tests from obsreport_test.go
// into existing `obsreport` package once its functionally is not exposed
// as public API. For this part is kept private.

// obsExporter is a helper to add observability to a component.Exporter.
type obsExporter struct {
*obsreport.Exporter
mutators []tag.Mutator
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter {
return &obsExporter{
obsreport.NewExporter(cfg),
[]tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
}
109 changes: 109 additions & 0 deletions exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

import (
"context"
"reflect"
"sort"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)

func TestExportEnqueueFailure(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

exporter := config.NewID("fakeExporter")

obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})

logRecords := 7
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords))

spans := 12
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans))

metricPoints := 21
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints))
}

// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
}

// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
}

// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
}

// checkValueForView checks that for the current exported value in the view with the given name
// for {LegacyTagKeyReceiver: receiverName} is equal to "value".
func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
// Make sure the tags slice is sorted by tag keys.
sortTags(wantTags)

rows, err := view.RetrieveData(vName)
require.NoError(t, err)

for _, row := range rows {
// Make sure the tags slice is sorted by tag keys.
sortTags(row.Tags)
if reflect.DeepEqual(wantTags, row.Tags) {
sum := row.Data.(*view.SumData)
require.Equal(t, float64(value), sum.Value)
return
}
}

require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
}

// tagsForExporterView returns the tags that are needed for the exporter views.
func tagsForExporterView(exporter config.ComponentID) []tag.Tag {
return []tag.Tag{
{Key: exporterTag, Value: exporter.String()},
}
}

func sortTags(tags []tag.Tag) {
sort.SliceStable(tags, func(i, j int) bool {
return tags[i].Key.Name() < tags[j].Key.Name()
})
}
5 changes: 2 additions & 3 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)

type tracesRequest struct {
Expand Down Expand Up @@ -97,7 +96,7 @@ func NewTracesExporter(
req := newTracesRequest(ctx, td, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count())
be.obsrep.recordTracesEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)
Expand All @@ -109,7 +108,7 @@ func NewTracesExporter(
}

type tracesExporterWithObservability struct {
obsrep *obsreport.Exporter
obsrep *obsExporter
nextSender requestSender
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10))
checkExporterEnqueueFailedTracesStats(t, fakeTracesExporterName, int64(10))
}

func TestTracesExporter_WithSpan(t *testing.T) {
Expand Down
15 changes: 0 additions & 15 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) {
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
}

// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
}

// StartMetricsOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
Expand All @@ -83,11 +78,6 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
}

// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
}

// StartLogsOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
Expand All @@ -102,11 +92,6 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
}

// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
}

// startSpan creates the span used to trace the operation. Returning
// the updated context and the created span.
func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context {
Expand Down
20 changes: 0 additions & 20 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,26 +436,6 @@ func TestExportLogsOp(t *testing.T) {
obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords))
}

func TestExportEnqueueFailure(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})

logRecords := 7
obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords)
obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords))

spans := 12
obsrep.RecordTracesEnqueueFailure(context.Background(), spans)
obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans))

metricPoints := 21
obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints)
obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints))
}

func TestReceiveWithLongLivedCtx(t *testing.T) {
ss := &spanStore{}
trace.RegisterExporter(ss)
Expand Down
21 changes: 0 additions & 21 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ func CheckExporterTraces(t *testing.T, exporter config.ComponentID, acceptedSpan
checkValueForView(t, exporterTags, droppedSpans, "exporter/send_failed_spans")
}

// CheckExporterEnqueueFailedTraces checks that reported number of spans failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterEnqueueFailedTraces(t *testing.T, exporter config.ComponentID, spans int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
}

// CheckExporterMetrics checks that for the current exported values for metrics exporter metrics match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMetricsPoints, droppedMetricsPoints int64) {
Expand All @@ -80,13 +73,6 @@ func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMet
checkValueForView(t, exporterTags, droppedMetricsPoints, "exporter/send_failed_metric_points")
}

// CheckExporterEnqueueFailedMetrics checks that reported number of metric points failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterEnqueueFailedMetrics(t *testing.T, exporter config.ComponentID, metricPoints int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
}

// CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRecords, droppedLogRecords int64) {
Expand All @@ -95,13 +81,6 @@ func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRec
checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records")
}

// CheckExporterEnqueueFailedLogs checks that reported number of log records failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterEnqueueFailedLogs(t *testing.T, exporter config.ComponentID, logRecords int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckProcessorTraces(t *testing.T, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) {
Expand Down

0 comments on commit 2bb405a

Please sign in to comment.