Skip to content

Commit

Permalink
add: Error Handling Environment Bundle to Bento configuration (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregfurman authored Dec 6, 2024
1 parent 86e8d2b commit cfeec19
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 8 deletions.
28 changes: 28 additions & 0 deletions internal/bundle/strict/bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package strict

import (
"github.com/warpstreamlabs/bento/internal/bundle"
"github.com/warpstreamlabs/bento/internal/component/processor"
)

// StrictBundle modifies a provided bundle environment so that all procesors
// will fail an entire batch if any any message-level error is encountered. These
// failed batches are nacked and/or reprocessed depending on your input.
func StrictBundle(b *bundle.Environment) *bundle.Environment {
strictEnv := b.Clone()

for _, spec := range b.ProcessorDocs() {
_ = strictEnv.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
proc, err := b.ProcessorInit(conf, nm)
if err != nil {
return nil, err
}
proc = wrapWithStrict(proc)
return proc, err
}, spec)
}

// TODO: Overwrite inputs for retry with backoff

return strictEnv
}
100 changes: 100 additions & 0 deletions internal/bundle/strict/bundle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package strict_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/warpstreamlabs/bento/internal/bundle"
"github.com/warpstreamlabs/bento/internal/bundle/strict"
"github.com/warpstreamlabs/bento/internal/component/testutil"
"github.com/warpstreamlabs/bento/internal/manager"
"github.com/warpstreamlabs/bento/internal/message"

_ "github.com/warpstreamlabs/bento/internal/impl/pure"
)

func TestStrictBundleProcessor(t *testing.T) {
senv := strict.StrictBundle(bundle.GlobalEnvironment)
tCtx := context.Background()

pConf, err := testutil.ProcessorFromYAML(`
bloblang: root = this
`)
require.NoError(t, err)

mgr, err := manager.New(
manager.ResourceConfig{},
manager.OptSetEnvironment(senv),
)
require.NoError(t, err)

proc, err := mgr.NewProcessor(pConf)
require.NoError(t, err)

msg := message.QuickBatch([][]byte{[]byte("not a structured doc")})
msgs, res := proc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.ErrorContains(t, res, "invalid character 'o' in literal null (expecting 'u')")

msg = message.QuickBatch([][]byte{[]byte(`{"hello":"world"}`)})
msgs, res = proc.ProcessBatch(tCtx, msg)
require.NoError(t, res)
require.Len(t, msgs, 1)
assert.Equal(t, 1, msgs[0].Len())
assert.Equal(t, `{"hello":"world"}`, string(msgs[0].Get(0).AsBytes()))
}

func TestStrictBundleProcessorMultiMessage(t *testing.T) {
senv := strict.StrictBundle(bundle.GlobalEnvironment)
tCtx := context.Background()

pConf, err := testutil.ProcessorFromYAML(`
bloblang: root = this
`)
require.NoError(t, err)

mgr, err := manager.New(
manager.ResourceConfig{},
manager.OptSetEnvironment(senv),
)
require.NoError(t, err)

proc, err := mgr.NewProcessor(pConf)
require.NoError(t, err)

msg := message.QuickBatch([][]byte{
[]byte("not a structured doc"),
[]byte(`{"foo":"oof"}`),
[]byte(`{"bar":"rab"}`),
})
msgs, res := proc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.ErrorContains(t, res, "invalid character 'o' in literal null (expecting 'u')")

// Ensure the ordering of the message does not influence the error message
msg = message.QuickBatch([][]byte{
[]byte(`{"foo":"oof"}`),
[]byte("not a structured doc"),
[]byte(`{"bar":"rab"}`),
})
msgs, res = proc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.ErrorContains(t, res, "invalid character 'o' in literal null (expecting 'u')")

// Multiple errored messages
msg = message.QuickBatch([][]byte{
[]byte(`{"foo":"oof"}`),
[]byte("not a structured doc"),
[]byte(`another unstructred doc`),
})
msgs, res = proc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.ErrorContains(t, res, "invalid character 'o' in literal null (expecting 'u')")
}
66 changes: 66 additions & 0 deletions internal/bundle/strict/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package strict

import (
"context"

"github.com/warpstreamlabs/bento/internal/batch"
iprocessor "github.com/warpstreamlabs/bento/internal/component/processor"
"github.com/warpstreamlabs/bento/internal/message"
)

func wrapWithStrict(p iprocessor.V1) iprocessor.V1 {
t := &strictProcessor{
wrapped: p,
enabled: true,
}
return t
}

//------------------------------------------------------------------------------

// strictProcessor fails batch processing if any message contains an error.
type strictProcessor struct {
wrapped iprocessor.V1
enabled bool
}

func (s *strictProcessor) ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error) {
if !s.enabled {
return s.wrapped.ProcessBatch(ctx, b)
}

batches, err := s.wrapped.ProcessBatch(ctx, b)
if err != nil {
return nil, err
}

// Iterate through all messages and populate a batch.Error type, calling Failed()
// for each errored message. Otherwise, every message in the batch is treated as a failure.
for _, msg := range batches {
var batchErr *batch.Error
_ = msg.Iter(func(i int, p *message.Part) error {
mErr := p.ErrorGet()
if mErr == nil {
return nil
}
if batchErr == nil {
batchErr = batch.NewError(msg, mErr)
}
batchErr.Failed(i, mErr)
return nil
})
if batchErr != nil {
return nil, batchErr
}
}

return batches, nil
}

func (s *strictProcessor) Close(ctx context.Context) error {
return s.wrapped.Close(ctx)
}

func (s *strictProcessor) UnwrapProc() iprocessor.V1 {
return s.wrapped
}
89 changes: 89 additions & 0 deletions internal/bundle/strict/processor_strict_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package strict

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/warpstreamlabs/bento/internal/message"
)

//------------------------------------------------------------------------------

type mockProc struct{}

func (m mockProc) ProcessBatch(ctx context.Context, msg message.Batch) ([]message.Batch, error) {
for _, m := range msg {
_, err := m.AsStructuredMut()
m.ErrorSet(err)
}
return []message.Batch{msg}, nil
}

func (m mockProc) Close(ctx context.Context) error {
// Do nothing as our processor doesn't require resource cleanup.
return nil
}

//------------------------------------------------------------------------------

func TestProcessorWrapWithStrict(t *testing.T) {
tCtx := context.Background()

// Wrap the processor with the strict interface
strictProc := wrapWithStrict(mockProc{})

msg := message.QuickBatch([][]byte{[]byte("not a structured doc")})
msgs, res := strictProc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.EqualError(t, res, "invalid character 'o' in literal null (expecting 'u')")

msg = message.QuickBatch([][]byte{[]byte(`{"hello":"world"}`)})
msgs, res = strictProc.ProcessBatch(tCtx, msg)
require.NoError(t, res)
require.Len(t, msgs, 1)
assert.Equal(t, 1, msgs[0].Len())
assert.Equal(t, `{"hello":"world"}`, string(msgs[0].Get(0).AsBytes()))
}

func TestProcessorWrapWithStrictMultiMessage(t *testing.T) {
tCtx := context.Background()

// Wrap the processor with the strict interface
strictProc := wrapWithStrict(mockProc{})

msg := message.QuickBatch([][]byte{
[]byte("not a structured doc"),
[]byte(`{"foo":"oof"}`),
[]byte(`{"bar":"rab"}`),
})
msgs, res := strictProc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.Error(t, res, "invalid character 'o' in literal null (expecting 'u')")

// Ensure the ordering of the message does not influence the error message
msg = message.QuickBatch([][]byte{
[]byte(`{"foo":"oof"}`),
[]byte("not a structured doc"),
[]byte(`{"bar":"rab"}`),
})
msgs, res = strictProc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.Error(t, res, "invalid character 'o' in literal null (expecting 'u')")

// Multiple errored messages
msg = message.QuickBatch([][]byte{
[]byte(`{"foo":"oof"}`),
[]byte("not a structured doc"),
[]byte(`another unstructred doc`),
})
msgs, res = strictProc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.Error(t, res, "invalid character 'o' in literal null (expecting 'u')")
}
6 changes: 6 additions & 0 deletions internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/warpstreamlabs/bento/internal/api"
"github.com/warpstreamlabs/bento/internal/bundle"
"github.com/warpstreamlabs/bento/internal/bundle/strict"
"github.com/warpstreamlabs/bento/internal/component/metrics"
"github.com/warpstreamlabs/bento/internal/config"
"github.com/warpstreamlabs/bento/internal/docs"
Expand Down Expand Up @@ -104,6 +105,11 @@ func CreateManager(
manager.OptSetStreamsMode(streamsMode),
}, mgrOpts...)

// Initialise processors with global error handling strategy
if conf.ErrorHandling.Strategy == "reject" {
mgrOpts = append(mgrOpts, manager.OptSetEnvironment(strict.StrictBundle(bundle.GlobalEnvironment)))
}

// Create resource manager.
var mgr *manager.Type
if mgr, err = manager.New(conf.ResourceConfig, mgrOpts...); err != nil {
Expand Down
32 changes: 26 additions & 6 deletions internal/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/warpstreamlabs/bento/internal/component/tracer"
"github.com/warpstreamlabs/bento/internal/config/test"
"github.com/warpstreamlabs/bento/internal/docs"
"github.com/warpstreamlabs/bento/internal/errorhandling"

"github.com/warpstreamlabs/bento/internal/log"
"github.com/warpstreamlabs/bento/internal/manager"
"github.com/warpstreamlabs/bento/internal/stream"
Expand All @@ -17,6 +19,7 @@ const (
fieldLogger = "logger"
fieldMetrics = "metrics"
fieldTracer = "tracer"
fieldErrorHandling = "error_handling"
fieldSystemCloseDelay = "shutdown_delay"
fieldSystemCloseTimeout = "shutdown_timeout"
fieldTests = "tests"
Expand All @@ -27,12 +30,13 @@ type Type struct {
HTTP api.Config `yaml:"http"`
stream.Config `yaml:",inline"`
manager.ResourceConfig `yaml:",inline"`
Logger log.Config `yaml:"logger"`
Metrics metrics.Config `yaml:"metrics"`
Tracer tracer.Config `yaml:"tracer"`
SystemCloseDelay string `yaml:"shutdown_delay"`
SystemCloseTimeout string `yaml:"shutdown_timeout"`
Tests []any `yaml:"tests"`
Logger log.Config `yaml:"logger"`
Metrics metrics.Config `yaml:"metrics"`
Tracer tracer.Config `yaml:"tracer"`
ErrorHandling errorhandling.Config `yaml:"error_handling"`
SystemCloseDelay string `yaml:"shutdown_delay"`
SystemCloseTimeout string `yaml:"shutdown_timeout"`
Tests []any `yaml:"tests"`

rawSource any
}
Expand Down Expand Up @@ -62,12 +66,19 @@ func observabilityFields() docs.FieldSpecs {
}
}

func errorHandlingFields() docs.FieldSpecs {
return docs.FieldSpecs{
docs.FieldObject(fieldErrorHandling, "Environment-wide settings for handling errored messages.").WithChildren(errorhandling.Spec()...),
}
}

// Spec returns a docs.FieldSpec for an entire Bento configuration.
func Spec() docs.FieldSpecs {
fields := docs.FieldSpecs{httpField}
fields = append(fields, stream.Spec()...)
fields = append(fields, manager.Spec()...)
fields = append(fields, observabilityFields()...)
fields = append(fields, errorHandlingFields()...)
fields = append(fields, test.ConfigSpec().Advanced())
return fields
}
Expand Down Expand Up @@ -140,6 +151,15 @@ func noStreamFromParsed(prov docs.Provider, pConf *docs.ParsedConfig, conf *Type
return
}
}

if pConf.Contains(fieldErrorHandling) {
if conf.ErrorHandling, err = errorhandling.FromParsed(pConf.Namespace(fieldErrorHandling)); err != nil {
return
}
} else {
conf.ErrorHandling = errorhandling.NewConfig()
}

if pConf.Contains(fieldTests) {
var tmpTests []*docs.ParsedConfig
if tmpTests, err = pConf.FieldAnyList(fieldTests); err != nil {
Expand Down
Loading

0 comments on commit cfeec19

Please sign in to comment.