Skip to content

Commit

Permalink
Instrument exporters (#14)
Browse files Browse the repository at this point in the history
* Allow for dual instrumentation

* remove log

* Add back mutators
  • Loading branch information
jaronoff97 committed Oct 18, 2022
1 parent c157b12 commit 97135a3
Showing 1 changed file with 129 additions and 9 deletions.
138 changes: 129 additions & 9 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,44 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const (
exporterName = "exporter"

exporterScope = scopeName + nameSep + exporterName
)

// Exporter is a helper to add observability to a component.Exporter.
type Exporter struct {
level configtelemetry.Level
spanNamePrefix string
mutators []tag.Mutator
tracer trace.Tracer
meter metric.Meter
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans syncint64.Counter
failedToSendSpans syncint64.Counter
sentMetricPoints syncint64.Counter
failedToSendMetricPoints syncint64.Counter
sentLogRecords syncint64.Counter
failedToSendLogRecords syncint64.Counter
}

// ExporterSettings are settings for creating an Exporter.
Expand All @@ -44,12 +68,70 @@ type ExporterSettings struct {

// NewExporter creates a new Exporter.
func NewExporter(cfg ExporterSettings) *Exporter {
return &Exporter{
exp := &Exporter{
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),
meter: cfg.ExporterCreateSettings.MeterProvider.Meter(exporterScope),
logger: cfg.ExporterCreateSettings.Logger,

useOtelForMetrics: featuregate.GetRegistry().IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
},
}
exp.createOtelMetrics()

return exp
}

func (exp *Exporter) createOtelMetrics() {
if !exp.useOtelForMetrics {
return
}

var err error
handleError := func(metricName string, err error) {
if err != nil {
exp.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
}
}
exp.sentSpans, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentSpansKey,
instrument.WithDescription("Number of spans successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentSpansKey, err)

exp.failedToSendSpans, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendSpansKey,
instrument.WithDescription("Number of spans in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendSpansKey, err)

exp.sentMetricPoints, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
instrument.WithDescription("Number of metric points successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey, err)

exp.failedToSendMetricPoints, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendMetricPointsKey,
instrument.WithDescription("Number of metric points in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendMetricPointsKey, err)

exp.sentLogRecords, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
instrument.WithDescription("Number of log record successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey, err)

exp.failedToSendLogRecords, err = exp.meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendLogRecordsKey,
instrument.WithDescription("Number of log records in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendLogRecordsKey, err)
}

// StartTracesOp is called at the start of an Export operation.
Expand All @@ -62,7 +144,7 @@ func (exp *Exporter) StartTracesOp(ctx context.Context) context.Context {
// EndTracesOp completes the export operation that was started with StartTracesOp.
func (exp *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) {
numSent, numFailedToSend := toNumItems(numSpans, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans)
exp.recordMetrics(ctx, config.TracesDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
}

Expand All @@ -77,7 +159,7 @@ func (exp *Exporter) StartMetricsOp(ctx context.Context) context.Context {
// StartMetricsOp.
func (exp *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err error) {
numSent, numFailedToSend := toNumItems(numMetricPoints, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints)
exp.recordMetrics(ctx, config.MetricsDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
}

Expand All @@ -91,7 +173,7 @@ func (exp *Exporter) StartLogsOp(ctx context.Context) context.Context {
// EndLogsOp completes the export operation that was started with StartLogsOp.
func (exp *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords)
exp.recordMetrics(ctx, config.LogsDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
}

Expand All @@ -103,16 +185,54 @@ func (exp *Exporter) startOp(ctx context.Context, operationSuffix string) contex
return ctx
}

func (exp *Exporter) recordMetrics(ctx context.Context, numSent, numFailedToSend int64, sentMeasure, failedToSendMeasure *stats.Int64Measure) {
func (exp *Exporter) recordMetrics(ctx context.Context, dataType config.DataType, numSent, numFailed int64) {
if exp.level == configtelemetry.LevelNone {
return
}
// Ignore the error for now. This should not happen.
if numFailedToSend > 0 {
_ = stats.RecordWithTags(ctx, exp.mutators, sentMeasure.M(numSent), failedToSendMeasure.M(numFailedToSend))
if exp.useOtelForMetrics {
exp.recordWithOtel(ctx, dataType, numSent, numFailed)
} else {
_ = stats.RecordWithTags(ctx, exp.mutators, sentMeasure.M(numSent))
exp.recordWithOC(ctx, dataType, numSent, numFailed)
}
}

func (exp *Exporter) recordWithOtel(ctx context.Context, dataType config.DataType, sent int64, failed int64) {
var sentMeasure, failedMeasure syncint64.Counter
switch dataType {
case config.TracesDataType:
sentMeasure = exp.sentSpans
failedMeasure = exp.failedToSendSpans
case config.MetricsDataType:
sentMeasure = exp.sentMetricPoints
failedMeasure = exp.failedToSendMetricPoints
case config.LogsDataType:
sentMeasure = exp.sentLogRecords
failedMeasure = exp.failedToSendLogRecords
}

sentMeasure.Add(ctx, sent, exp.otelAttrs...)
failedMeasure.Add(ctx, failed, exp.otelAttrs...)
}

func (exp *Exporter) recordWithOC(ctx context.Context, dataType config.DataType, sent int64, failed int64) {
var sentMeasure, failedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
sentMeasure = obsmetrics.ExporterSentSpans
failedMeasure = obsmetrics.ExporterFailedToSendSpans
case config.MetricsDataType:
sentMeasure = obsmetrics.ReceiverAcceptedMetricPoints
failedMeasure = obsmetrics.ExporterFailedToSendMetricPoints
case config.LogsDataType:
sentMeasure = obsmetrics.ReceiverAcceptedLogRecords
failedMeasure = obsmetrics.ExporterFailedToSendLogRecords
}

_ = stats.RecordWithTags(
ctx,
exp.mutators,
sentMeasure.M(sent),
failedMeasure.M(failed))
}

func endSpan(ctx context.Context, err error, numSent, numFailedToSend int64, sentItemsKey, failedToSendItemsKey string) {
Expand Down

0 comments on commit 97135a3

Please sign in to comment.