Skip to content

Commit

Permalink
(batchprocessor): enable parallel exports in the batch processor (#90)
Browse files Browse the repository at this point in the history
Part 3 of #80

This PR:
- renames component to `concurrentbatchprocessor`
- when items are sent, `export` is called in a goroutine

---------

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
  • Loading branch information
moh-osman3 and jmacd authored Nov 8, 2023
1 parent 2b9634e commit affade9
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"context"
Expand Down Expand Up @@ -108,12 +108,14 @@ type pendingItem struct {
type dataItem struct {
data any
responseCh chan error
count int
}

// batch is an interface generalizing the individual signal types.
type batch interface {
// export the current batch
export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error)
export(ctx context.Context, req any) error
splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, bytes int, req any)

// itemCount returns the size of the current batch
itemCount() int
Expand All @@ -122,42 +124,19 @@ type batch interface {
add(item any)
}

// partialError is useful when a producer adds items that are split
// countedError is useful when a producer adds items that are split
// between multiple batches. This signals that producers should continue
// waiting until a completeError is received.
type partialError struct {
// waiting until all its items receive a response.
type countedError struct {
err error
count int
}

func (pe partialError) Error() string {
if pe.err == nil {
return ""
}
return fmt.Sprintf("batch partial error: %s", pe.err.Error())
}

func (pe partialError) Unwrap() error {
return pe.err
}

func isPartialError(err error) bool {
return errors.Is(err, partialError{err: errors.Unwrap(err)})
}

type completeError struct {
err error
}

func (ce completeError) Error() string {
func (ce countedError) Error() string {
if ce.err == nil {
return ""
}
return fmt.Sprintf("batch complete error: %s", ce.err.Error())

}

func (ce completeError) Unwrap() error {
return ce.err
return fmt.Sprintf("batch error: %s", ce.err.Error())
}

var _ consumer.Traces = (*batchProcessor)(nil)
Expand Down Expand Up @@ -323,20 +302,27 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
sent, bytes, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)

var waiters []chan error
var countItems []int

numItemsBefore := b.totalSent
numItemsAfter := b.totalSent + sent

// The current batch can contain items from several different producers. Ensure each producer gets a response back.
for len(b.pending) > 0 && numItemsBefore < numItemsAfter {
// Waiter only had some items in the current batch
if numItemsBefore + b.pending[0].numItems > numItemsAfter {
b.pending[0].numItems -= (numItemsAfter - numItemsBefore)
b.pending[0].respCh <- partialError{err: err}
numItemsBefore = numItemsAfter
partialSent := numItemsAfter - numItemsBefore
b.pending[0].numItems -= partialSent
numItemsBefore += partialSent
waiters = append(waiters, b.pending[0].respCh)
countItems = append(countItems, partialSent)
} else { // waiter gets a complete response.
numItemsBefore += b.pending[0].numItems
b.pending[0].respCh <- completeError{err: err}
waiters = append(waiters, b.pending[0].respCh)
countItems = append(countItems, b.pending[0].numItems)

// complete response sent so b.pending[0] can be popped from queue.
if len(b.pending) > 1 {
Expand All @@ -347,45 +333,61 @@ func (b *shard) sendItems(trigger trigger) {
}
}

b.totalSent = numItemsAfter
go func() {
err := b.batch.export(b.exportCtx, req)
for i := range waiters {
count := countItems[i]
waiter := waiters[i]
waiter <- countedError{err: err, count: count}
}

if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}
}
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}
}()

// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
b.totalSent = numItemsAfter
}

func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// TODO: add a semaphore to only write to channel if sizeof(data) keeps
// us below some configured inflight byte limit.
item := dataItem{
data: data,
responseCh: respCh,
}

switch telem := data.(type) {
case ptrace.Traces:
item.count = telem.SpanCount()
case pmetric.Metrics:
item.count = telem.DataPointCount()
case plog.Logs:
item.count = telem.LogRecordCount()
}

select {
case <-ctx.Done():
return ctx.Err()
case sb.batcher.newItem <- item:
case b.newItem <- item:
}
var err error

for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
unwrap := newErr.(countedError)
if unwrap.err != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
item.count -= unwrap.count
if item.count != 0 {
continue
}

Expand All @@ -398,6 +400,16 @@ func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
return nil
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
}

func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
return sb.batcher.consumeAndWait(ctx, data)
}

func (sb *singleShardBatcher) currentMetadataCardinality() int {
return 1
}
Expand All @@ -414,7 +426,6 @@ type multiShardBatcher struct {
}

func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
Expand Down Expand Up @@ -452,36 +463,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
mb.lock.Unlock()
}

item := dataItem{
data: data,
responseCh: respCh,
}
select {
case <-ctx.Done():
return ctx.Err()
case b.(*shard).newItem <- item:
}

var err error
for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
continue
}

return err
case <-ctx.Done():
err = multierr.Append(err, ctx.Err())
return err
}
}
return nil
return b.(*shard).consumeAndWait(ctx, data)
}

func recordBatchError(err error) error {
Expand Down Expand Up @@ -548,7 +530,12 @@ func (bt *batchTraces) add(item any) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bt *batchTraces) export(ctx context.Context, req any) error {
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}

func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req ptrace.Traces
var sent int
var bytes int
Expand All @@ -565,7 +552,7 @@ func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnB
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req)
return sent, bytes, req
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -583,7 +570,12 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bm *batchMetrics) export(ctx context.Context, req any) error {
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}

func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req pmetric.Metrics
var sent int
var bytes int
Expand All @@ -597,10 +589,11 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, return
bm.metricData = pmetric.NewMetrics()
bm.dataPointCount = 0
}

if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req)
return sent, bytes, req
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -629,7 +622,12 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bl *batchLogs) export(ctx context.Context, req any) error {
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}

func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
var req plog.Logs
var sent int
var bytes int
Expand All @@ -647,7 +645,7 @@ func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnByt
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req)
return sent, bytes, req
}

func (bl *batchLogs) itemCount() int {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"context"
Expand All @@ -15,7 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor/testdata"
"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -505,7 +505,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, true)
sent, _, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true)
sendErr := batchMetrics.export(ctx, req)
require.NoError(t, sendErr)
require.Equal(t, sendBatchMaxSize, sent)
remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"path/filepath"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor"
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batchprocessor
package concurrentbatchprocessor

import (
"context"
Expand Down
Loading

0 comments on commit affade9

Please sign in to comment.