Skip to content

Commit

Permalink
[chore] Merge observability report sender for all signals, remove dup…
Browse files Browse the repository at this point in the history
…licate code

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jan 23, 2025
1 parent 808e4a8 commit e77201b
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 405 deletions.
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type BaseExporter struct {
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
obsReport, err := NewExporter(ObsReportSettings{ExporterSettings: set, Signal: signal})
obsReport, err := NewObsReport(ObsReportSettings{ExporterSettings: set, Signal: signal})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return be, nil
}

// send sends the request using the first sender in the chain.
// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
err := be.QueueSender.Send(ctx, req)
if err != nil {
Expand Down Expand Up @@ -282,7 +282,7 @@ func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option {
return func(o *BaseExporter) error {
Expand Down
141 changes: 141 additions & 0 deletions exporter/exporterhelper/internal/obs_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/pipeline"
)

const (
// spanNameSep is duplicate between receiver and exporter.
spanNameSep = "/"

// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// ItemsSent used to track number of items sent by exporters.
ItemsSent = "items.sent"
// ItemsFailed used to track number of items that failed to be sent by exporters.
ItemsFailed = "items.failed"
)

// ObsReport is a helper to add observability to an exporter.
type ObsReport struct {
spanName string
tracer trace.Tracer
Signal pipeline.Signal

spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
TelemetryBuilder *metadata.TelemetryBuilder
enqueueFailedInst metric.Int64Counter
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
type ObsReportSettings struct {
ExporterSettings exporter.Settings
Signal pipeline.Signal
}

func NewObsReport(set ObsReportSettings) (*ObsReport, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
}

Check warning on line 59 in exporter/exporterhelper/internal/obs_report.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/obs_report.go#L58-L59

Added lines #L58 - L59 were not covered by tests

idStr := set.ExporterSettings.ID.String()
expAttr := attribute.String(ExporterKey, idStr)

or := &ObsReport{
spanName: ExporterKey + spanNameSep + idStr + spanNameSep + set.Signal.String(),
tracer: metadata.Tracer(set.ExporterSettings.TelemetrySettings),
Signal: set.Signal,
spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, set.Signal.String())),
metricAttr: metric.WithAttributeSet(attribute.NewSet(expAttr)),
TelemetryBuilder: telemetryBuilder,
}

switch set.Signal {
case pipeline.SignalTraces:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedSpans
or.itemsSentInst = or.TelemetryBuilder.ExporterSentSpans
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedSpans

case pipeline.SignalMetrics:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedMetricPoints
or.itemsSentInst = or.TelemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedMetricPoints

case pipeline.SignalLogs:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedLogRecords
or.itemsSentInst = or.TelemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedLogRecords
}

return or, nil
}

// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (or *ObsReport) StartOp(ctx context.Context) context.Context {
ctx, _ = or.tracer.Start(ctx, or.spanName, or.spanAttrs)
return ctx
}

// EndOp completes the export operation that was started with StartOp.
func (or *ObsReport) EndOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)

// No metrics recorded for profiles.
if or.itemsSentInst != nil {
or.itemsSentInst.Add(ctx, numSent, or.metricAttr)
}
// No metrics recorded for profiles.
if or.itemsFailedInst != nil {
or.itemsFailedInst.Add(ctx, numFailedToSend, or.metricAttr)
}

span := trace.SpanFromContext(ctx)
defer span.End()
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(ItemsSent, numSent),
attribute.Int64(ItemsFailed, numFailedToSend),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
}

func toNumItems(numExportedItems int, err error) (int64, int64) {
if err != nil {
return 0, int64(numExportedItems)
}
return int64(numExportedItems), 0
}

func (or *ObsReport) RecordEnqueueFailure(ctx context.Context, failed int64) {
// No metrics recorded for profiles.
if or.enqueueFailedInst == nil {
return
}

Check warning on line 138 in exporter/exporterhelper/internal/obs_report.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/obs_report.go#L137-L138

Added lines #L137 - L138 were not covered by tests

or.enqueueFailedInst.Add(ctx, failed, or.metricAttr)
}
28 changes: 28 additions & 0 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/collector/exporter/internal"
)

type obsReportSender[K internal.Request] struct {
BaseSender[K]
obsrep *ObsReport
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep}
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.NextSender.Send(c, req)
ors.obsrep.EndOp(c, items, err)
return err
}
Loading

0 comments on commit e77201b

Please sign in to comment.