Skip to content

Commit

Permalink
Merge observability report sender for all signals, remove duplicate c…
Browse files Browse the repository at this point in the history
…ode (#12165)

Depends on
#12164

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jan 23, 2025
1 parent c24f9dc commit 765fcef
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 405 deletions.
25 changes: 25 additions & 0 deletions .chloggen/merge-obs-report.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Rename exporter span signal specific attributes (e.g. "sent_spans" / "send_failed_span") to "items.sent" / "items.failed".

# One or more tracking issues or pull requests related to the change
issues: [12165]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type BaseExporter struct {
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
obsReport, err := NewExporter(ObsReportSettings{ExporterSettings: set, Signal: signal})
obsReport, err := NewObsReport(ObsReportSettings{ExporterSettings: set, Signal: signal})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return be, nil
}

// send sends the request using the first sender in the chain.
// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
err := be.QueueSender.Send(ctx, req)
if err != nil {
Expand Down Expand Up @@ -282,7 +282,7 @@ func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option {
return func(o *BaseExporter) error {
Expand Down
141 changes: 141 additions & 0 deletions exporter/exporterhelper/internal/obs_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/pipeline"
)

const (
// spanNameSep is duplicate between receiver and exporter.
spanNameSep = "/"

// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// ItemsSent used to track number of items sent by exporters.
ItemsSent = "items.sent"
// ItemsFailed used to track number of items that failed to be sent by exporters.
ItemsFailed = "items.failed"
)

// ObsReport is a helper to add observability to an exporter.
type ObsReport struct {
spanName string
tracer trace.Tracer
Signal pipeline.Signal

spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
TelemetryBuilder *metadata.TelemetryBuilder
enqueueFailedInst metric.Int64Counter
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
type ObsReportSettings struct {
ExporterSettings exporter.Settings
Signal pipeline.Signal
}

func NewObsReport(set ObsReportSettings) (*ObsReport, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
}

idStr := set.ExporterSettings.ID.String()
expAttr := attribute.String(ExporterKey, idStr)

or := &ObsReport{
spanName: ExporterKey + spanNameSep + idStr + spanNameSep + set.Signal.String(),
tracer: metadata.Tracer(set.ExporterSettings.TelemetrySettings),
Signal: set.Signal,
spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, set.Signal.String())),
metricAttr: metric.WithAttributeSet(attribute.NewSet(expAttr)),
TelemetryBuilder: telemetryBuilder,
}

switch set.Signal {
case pipeline.SignalTraces:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedSpans
or.itemsSentInst = or.TelemetryBuilder.ExporterSentSpans
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedSpans

case pipeline.SignalMetrics:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedMetricPoints
or.itemsSentInst = or.TelemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedMetricPoints

case pipeline.SignalLogs:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedLogRecords
or.itemsSentInst = or.TelemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedLogRecords
}

return or, nil
}

// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (or *ObsReport) StartOp(ctx context.Context) context.Context {
ctx, _ = or.tracer.Start(ctx, or.spanName, or.spanAttrs)
return ctx
}

// EndOp completes the export operation that was started with StartOp.
func (or *ObsReport) EndOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)

// No metrics recorded for profiles.
if or.itemsSentInst != nil {
or.itemsSentInst.Add(ctx, numSent, or.metricAttr)
}
// No metrics recorded for profiles.
if or.itemsFailedInst != nil {
or.itemsFailedInst.Add(ctx, numFailedToSend, or.metricAttr)
}

span := trace.SpanFromContext(ctx)
defer span.End()
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(ItemsSent, numSent),
attribute.Int64(ItemsFailed, numFailedToSend),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
}

func toNumItems(numExportedItems int, err error) (int64, int64) {
if err != nil {
return 0, int64(numExportedItems)
}
return int64(numExportedItems), 0
}

func (or *ObsReport) RecordEnqueueFailure(ctx context.Context, failed int64) {
// No metrics recorded for profiles.
if or.enqueueFailedInst == nil {
return
}

or.enqueueFailedInst.Add(ctx, failed, or.metricAttr)
}
28 changes: 28 additions & 0 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/collector/exporter/internal"
)

type obsReportSender[K internal.Request] struct {
BaseSender[K]
obsrep *ObsReport
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep}
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.NextSender.Send(c, req)
ors.obsrep.EndOp(c, items, err)
return err
}
Loading

0 comments on commit 765fcef

Please sign in to comment.