Skip to content

Commit

Permalink
Instrument batch processor with Otel Go
Browse files Browse the repository at this point in the history
  • Loading branch information
paivagustavo committed Oct 27, 2022
1 parent 3262fd8 commit b440529
Show file tree
Hide file tree
Showing 7 changed files with 630 additions and 249 deletions.
16 changes: 16 additions & 0 deletions .chloggen/instrument-batch-processor-with-otel-go.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: batchprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "instrument the `batch` processor with OpenTelemetry Go SDK"

# One or more tracking issues or pull requests related to the change
issues: [6423]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
48 changes: 22 additions & 26 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ package batchprocessor // import "go.opentelemetry.io/collector/processor/batchp

import (
"context"
"fmt"
"runtime"
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -54,7 +54,7 @@ type batchProcessor struct {
shutdownC chan struct{}
goroutines sync.WaitGroup

telemetryLevel configtelemetry.Level
telemetry *batchProcessorTelemetry
}

type batch interface {
Expand All @@ -72,15 +72,16 @@ var _ consumer.Traces = (*batchProcessor)(nil)
var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)

func newBatchProcessor(set component.ProcessorCreateSettings, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, cfg.ID().String()))
func newBatchProcessor(set component.ProcessorCreateSettings, cfg *Config, batch batch, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) {
bpt, err := newBatchProcessorTelemetry(set.MeterProvider, cfg, telemetryLevel, registry)
if err != nil {
return nil, err
return nil, fmt.Errorf("error to create batch processor telemetry %w", err)
}

return &batchProcessor{
logger: set.Logger,
exportCtx: exportCtx,
telemetryLevel: telemetryLevel,
logger: set.Logger,
exportCtx: bpt.exportCtx,
telemetry: bpt,

sendBatchSize: int(cfg.SendBatchSize),
sendBatchMaxSize: int(cfg.SendBatchMaxSize),
Expand Down Expand Up @@ -130,7 +131,7 @@ func (bp *batchProcessor) startProcessingCycle() {
if bp.batch.itemCount() > 0 {
// TODO: Set a timeout on sendTraces or
// make it cancellable using the context that Shutdown gets as a parameter
bp.sendItems(statTimeoutTriggerSend)
bp.sendItems(triggerTimeout)
}
return
case item := <-bp.newItem:
Expand All @@ -140,7 +141,7 @@ func (bp *batchProcessor) startProcessingCycle() {
bp.processItem(item)
case <-bp.timer.C:
if bp.batch.itemCount() > 0 {
bp.sendItems(statTimeoutTriggerSend)
bp.sendItems(triggerTimeout)
}
bp.resetTimer()
}
Expand All @@ -152,7 +153,7 @@ func (bp *batchProcessor) processItem(item interface{}) {
sent := false
for bp.batch.itemCount() >= bp.sendBatchSize {
sent = true
bp.sendItems(statBatchSizeTriggerSend)
bp.sendItems(triggerBatchSize)
}

if sent {
Expand All @@ -171,17 +172,12 @@ func (bp *batchProcessor) resetTimer() {
bp.timer.Reset(bp.timeout)
}

func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
detailed := bp.telemetryLevel == configtelemetry.LevelDetailed
sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, detailed)
func (bp *batchProcessor) sendItems(trigger trigger) {
sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, bp.telemetry.detailed)
if err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
} else {
// Add that it came form the trace pipeline?
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(sent)))
if detailed {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bytes)))
}
bp.telemetry.record(trigger, int64(sent), int64(bytes))
}
}

Expand All @@ -205,18 +201,18 @@ func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error {
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(set component.ProcessorCreateSettings, next consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchTraces(next), telemetryLevel)
func newBatchTracesProcessor(set component.ProcessorCreateSettings, next consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchTraces(next), telemetryLevel, registry)
}

// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(set component.ProcessorCreateSettings, next consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchMetrics(next), telemetryLevel)
func newBatchMetricsProcessor(set component.ProcessorCreateSettings, next consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchMetrics(next), telemetryLevel, registry)
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(set component.ProcessorCreateSettings, next consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchLogs(next), telemetryLevel)
func newBatchLogsProcessor(set component.ProcessorCreateSettings, next consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, newBatchLogs(next), telemetryLevel, registry)
}

type batchTraces struct {
Expand Down
Loading

0 comments on commit b440529

Please sign in to comment.