Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(batchprocessor): enable parallel exports in the batch processor #90

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"context"
Expand Down Expand Up @@ -108,12 +108,14 @@ type pendingItem struct {
type dataItem struct {
data any
responseCh chan error
count int
}

// batch is an interface generalizing the individual signal types.
type batch interface {
// export the current batch
export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error)
export(ctx context.Context, req any) error
splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, bytes int, req any)

// itemCount returns the size of the current batch
itemCount() int
Expand All @@ -122,42 +124,19 @@ type batch interface {
add(item any)
}

// partialError is useful when a producer adds items that are split
// countedError is useful when a producer adds items that are split
// between multiple batches. This signals that producers should continue
// waiting until a completeError is received.
type partialError struct {
// waiting until all its items receive a response.
type countedError struct {
err error
count int
}

func (pe partialError) Error() string {
if pe.err == nil {
return ""
}
return fmt.Sprintf("batch partial error: %s", pe.err.Error())
}

func (pe partialError) Unwrap() error {
return pe.err
}

func isPartialError(err error) bool {
return errors.Is(err, partialError{err: errors.Unwrap(err)})
}

type completeError struct {
err error
}

func (ce completeError) Error() string {
func (ce countedError) Error() string {
if ce.err == nil {
return ""
}
return fmt.Sprintf("batch complete error: %s", ce.err.Error())

}

func (ce completeError) Unwrap() error {
return ce.err
return fmt.Sprintf("batch error: %s", ce.err.Error())
}

var _ consumer.Traces = (*batchProcessor)(nil)
Expand Down Expand Up @@ -323,20 +302,27 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
sent, bytes, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)

var waiters []chan error
var countItems []int

numItemsBefore := b.totalSent
numItemsAfter := b.totalSent + sent

// The current batch can contain items from several different producers. Ensure each producer gets a response back.
for len(b.pending) > 0 && numItemsBefore < numItemsAfter {
// Waiter only had some items in the current batch
if numItemsBefore + b.pending[0].numItems > numItemsAfter {
b.pending[0].numItems -= (numItemsAfter - numItemsBefore)
b.pending[0].respCh <- partialError{err: err}
numItemsBefore = numItemsAfter
partialSent := numItemsAfter - numItemsBefore
b.pending[0].numItems -= partialSent
numItemsBefore += partialSent
waiters = append(waiters, b.pending[0].respCh)
countItems = append(countItems, partialSent)
} else { // waiter gets a complete response.
numItemsBefore += b.pending[0].numItems
b.pending[0].respCh <- completeError{err: err}
waiters = append(waiters, b.pending[0].respCh)
countItems = append(countItems, b.pending[0].numItems)

// complete response sent so b.pending[0] can be popped from queue.
if len(b.pending) > 1 {
Expand All @@ -347,45 +333,61 @@ func (b *shard) sendItems(trigger trigger) {
}
}

b.totalSent = numItemsAfter
go func() {
err := b.batch.export(b.exportCtx, req)
for i := range waiters {
count := countItems[i]
waiter := waiters[i]
waiter <- countedError{err: err, count: count}
}

if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}
}
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}
}()

// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
b.totalSent = numItemsAfter
}

func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// TODO: add a semaphore to only write to channel if sizeof(data) keeps
// us below some configured inflight byte limit.
item := dataItem{
data: data,
responseCh: respCh,
}

switch telem := data.(type) {
case ptrace.Traces:
item.count = telem.SpanCount()
case pmetric.Metrics:
item.count = telem.DataPointCount()
case plog.Logs:
item.count = telem.LogRecordCount()
}

select {
case <-ctx.Done():
return ctx.Err()
case sb.batcher.newItem <- item:
case b.newItem <- item:
}
var err error

for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
unwrap := newErr.(countedError)
if unwrap.err != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
item.count -= unwrap.count
if item.count != 0 {
continue
}

Expand All @@ -398,6 +400,16 @@ func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
return nil
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
}

func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
return sb.batcher.consumeAndWait(ctx, data)
}

func (sb *singleShardBatcher) currentMetadataCardinality() int {
return 1
}
Expand All @@ -414,7 +426,6 @@ type multiShardBatcher struct {
}

func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
Expand Down Expand Up @@ -452,36 +463,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
mb.lock.Unlock()
}

item := dataItem{
data: data,
responseCh: respCh,
}
select {
case <-ctx.Done():
return ctx.Err()
case b.(*shard).newItem <- item:
}

var err error
for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
continue
}

return err
case <-ctx.Done():
err = multierr.Append(err, ctx.Err())
return err
}
}
return nil
return b.(*shard).consumeAndWait(ctx, data)
}

func recordBatchError(err error) error {
Expand Down Expand Up @@ -548,7 +530,12 @@ func (bt *batchTraces) add(item any) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bt *batchTraces) export(ctx context.Context, req any) error {
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}

func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req ptrace.Traces
var sent int
var bytes int
Expand All @@ -565,7 +552,7 @@ func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnB
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req)
return sent, bytes, req
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -583,7 +570,12 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bm *batchMetrics) export(ctx context.Context, req any) error {
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}

func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req pmetric.Metrics
var sent int
var bytes int
Expand All @@ -597,10 +589,11 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, return
bm.metricData = pmetric.NewMetrics()
bm.dataPointCount = 0
}

if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req)
return sent, bytes, req
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -629,7 +622,12 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bl *batchLogs) export(ctx context.Context, req any) error {
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}

func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req plog.Logs
var sent int
var bytes int
Expand All @@ -647,7 +645,7 @@ func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnByt
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req)
return sent, bytes, req
}

func (bl *batchLogs) itemCount() int {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"context"
Expand All @@ -15,7 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor/testdata"
"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -505,7 +505,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, true)
sent, _, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true)
sendErr := batchMetrics.export(ctx, req)
require.NoError(t, sendErr)
require.Equal(t, sendBatchMaxSize, sent)
remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"path/filepath"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"context"
Expand Down
Loading