Skip to content

Commit

Permalink
(batchprocessor): block requests until response is received (#71)
Browse files Browse the repository at this point in the history
Part 2 for #80

This PR adds the batchprocessor to otel-arrow with a couple
enhancements:
- block requests until the batch response is received
- errors reported to producers; timeout will now report an error

The main files to review are `batch_processor.go` and
`batch_processor_test.go`
  • Loading branch information
moh-osman3 authored Nov 3, 2023
1 parent c6161e5 commit c384fcf
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 126 deletions.
187 changes: 174 additions & 13 deletions collector/processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/client"
Expand All @@ -26,8 +27,10 @@ import (
"go.opentelemetry.io/collector/processor"
)

// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached.
var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations"))
var (
// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached.
errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations"))
)

// batch_processor is a component that accepts spans and metrics, places them
// into batches and sends downstream.
Expand Down Expand Up @@ -86,11 +89,25 @@ type shard struct {
timer *time.Timer

// newItem is used to receive data items from producers.
newItem chan any
newItem chan dataItem

// batch is an in-flight data item containing one of the
// underlying data types.
batch batch

pending []pendingItem

totalSent int
}

type pendingItem struct {
numItems int
respCh chan error
}

type dataItem struct {
data any
responseCh chan error
}

// batch is an interface generalizing the individual signal types.
Expand All @@ -105,6 +122,44 @@ type batch interface {
add(item any)
}

// partialError is useful when a producer adds items that are split
// between multiple batches. This signals that producers should continue
// waiting until a completeError is received.
type partialError struct {
err error
}

func (pe partialError) Error() string {
if pe.err == nil {
return ""
}
return fmt.Sprintf("batch partial error: %s", pe.err.Error())
}

func (pe partialError) Unwrap() error {
return pe.err
}

func isPartialError(err error) bool {
return errors.Is(err, partialError{err: errors.Unwrap(err)})
}

type completeError struct {
err error
}

func (ce completeError) Error() string {
if ce.err == nil {
return ""
}
return fmt.Sprintf("batch complete error: %s", ce.err.Error())

}

func (ce completeError) Unwrap() error {
return ce.err
}

var _ consumer.Traces = (*batchProcessor)(nil)
var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)
Expand Down Expand Up @@ -152,10 +207,11 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard {
})
b := &shard{
processor: bp,
newItem: make(chan any, runtime.NumCPU()),
newItem: make(chan dataItem, runtime.NumCPU()),
exportCtx: exportCtx,
batch: bp.batchFunc(),
}

b.processor.goroutines.Add(1)
go b.start()
return b
Expand Down Expand Up @@ -205,11 +261,11 @@ func (b *shard) start() {
if b.batch.itemCount() > 0 {
// TODO: Set a timeout on sendTraces or
// make it cancellable using the context that Shutdown gets as a parameter
b.sendItems(triggerTimeout)
b.sendItems(triggerShutdown)
}
return
case item := <-b.newItem:
if item == nil {
if item.data == nil {
continue
}
b.processItem(item)
Expand All @@ -222,12 +278,26 @@ func (b *shard) start() {
}
}

func (b *shard) processItem(item any) {
b.batch.add(item)
func (b *shard) processItem(item dataItem) {
before := b.batch.itemCount()
b.batch.add(item.data)
after := b.batch.itemCount()

totalItems := after - before
b.pending = append(b.pending, pendingItem{
numItems: totalItems,
respCh: item.responseCh,
})

b.flushItems()
}

func (b *shard) flushItems() {
sent := false

for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
sent = true
b.sendItems(triggerBatchSize)
sent = true
}

if sent {
Expand All @@ -254,6 +324,31 @@ func (b *shard) resetTimer() {

func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)

numItemsBefore := b.totalSent
numItemsAfter := b.totalSent + sent
// The current batch can contain items from several different producers. Ensure each producer gets a response back.
for len(b.pending) > 0 && numItemsBefore < numItemsAfter {
// Waiter only had some items in the current batch
if numItemsBefore + b.pending[0].numItems > numItemsAfter {
b.pending[0].numItems -= (numItemsAfter - numItemsBefore)
b.pending[0].respCh <- partialError{err: err}
numItemsBefore = numItemsAfter
} else { // waiter gets a complete response.
numItemsBefore += b.pending[0].numItems
b.pending[0].respCh <- completeError{err: err}

// complete response sent so b.pending[0] can be popped from queue.
if len(b.pending) > 1 {
b.pending = b.pending[1:]
} else {
b.pending = []pendingItem{}
}
}
}

b.totalSent = numItemsAfter

if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
Expand All @@ -267,8 +362,39 @@ type singleShardBatcher struct {
batcher *shard
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.batcher.newItem <- data
func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// TODO: add a semaphore to only write to channel if sizeof(data) keeps
// us below some configured inflight byte limit.
item := dataItem{
data: data,
responseCh: respCh,
}
select {
case <-ctx.Done():
return ctx.Err()
case sb.batcher.newItem <- item:
}
var err error

for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
continue
}

return err
case <-ctx.Done():
err = multierr.Append(err, ctx.Err())
return err
}
}
return nil
}

Expand All @@ -288,6 +414,7 @@ type multiShardBatcher struct {
}

func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
Expand Down Expand Up @@ -324,10 +451,43 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
}
mb.lock.Unlock()
}
b.(*shard).newItem <- data

item := dataItem{
data: data,
responseCh: respCh,
}
select {
case <-ctx.Done():
return ctx.Err()
case b.(*shard).newItem <- item:
}

var err error
for {
select {
case newErr := <-respCh:
// nil response might be wrapped as an error.
if errors.Unwrap(newErr) != nil {
err = multierr.Append(err, newErr)
}

if isPartialError(newErr) {
continue
}

return err
case <-ctx.Done():
err = multierr.Append(err, ctx.Err())
return err
}
}
return nil
}

func recordBatchError(err error) error {
return fmt.Errorf("Batch contained errors: %w", err)
}

func (mb *multiShardBatcher) currentMetadataCardinality() int {
mb.lock.Lock()
defer mb.lock.Unlock()
Expand Down Expand Up @@ -378,6 +538,7 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
// add updates current batchTraces by adding new TraceData object
func (bt *batchTraces) add(item any) {
td := item.(ptrace.Traces)

newSpanCount := td.SpanCount()
if newSpanCount == 0 {
return
Expand Down Expand Up @@ -502,4 +663,4 @@ func (bl *batchLogs) add(item any) {
}
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
}
Loading

0 comments on commit c384fcf

Please sign in to comment.