Skip to content

Commit

Permalink
small adjustments to fatal errors (#1830)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Sep 5, 2024
1 parent 7e4b3be commit 7e7bb8b
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 25 deletions.
13 changes: 8 additions & 5 deletions pkg/foundation/cerrors/fatal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ type fatalError struct {
}

// FatalError creates a new fatalError.
func FatalError(err error) *fatalError {
func FatalError(err error) error {
if err == nil {
return nil
}
if IsFatalError(err) {
return err // already a fatal error
}
return &fatalError{Err: err}
}

Expand All @@ -35,10 +41,7 @@ func (f *fatalError) Unwrap() error {

// Error returns the error message.
func (f *fatalError) Error() string {
if f.Err == nil {
return ""
}
return fmt.Sprintf("fatal error: %v", f.Err)
return fmt.Sprintf("fatal error: %s", f.Err.Error())
}

// IsFatalError checks if the error is a fatalError.
Expand Down
28 changes: 17 additions & 11 deletions pkg/foundation/cerrors/fatal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ import (
"github.com/matryer/is"
)

func TestNewFatalError(t *testing.T) {
func TestFatalError(t *testing.T) {
is := is.New(t)

err := cerrors.New("test error")

// wrapping the error multiple times should not change the error message
fatalErr := cerrors.FatalError(err)
fatalErr = cerrors.FatalError(fatalErr)
fatalErr = cerrors.FatalError(fatalErr)
fatalErr = cerrors.FatalError(fatalErr)

wantErr := fmt.Sprintf("fatal error: %v", err)

is.Equal(fatalErr.Error(), wantErr)
Expand Down Expand Up @@ -56,6 +62,16 @@ func TestIsFatalError(t *testing.T) {
err: err,
want: false,
},
{
name: "when it's nil",
err: nil,
want: false,
},
{
name: "when underlying is nil",
err: cerrors.FatalError(nil),
want: false,
},
}

for _, tc := range testCases {
Expand All @@ -74,13 +90,3 @@ func TestUnwrap(t *testing.T) {

is.Equal(cerrors.Unwrap(fatalErr), err)
}

func TestFatalError(t *testing.T) {
is := is.New(t)

err := cerrors.New("test error")
fatalErr := cerrors.FatalError(err)
wantErr := fmt.Sprintf("fatal error: %v", err)

is.Equal(fatalErr.Error(), wantErr)
}
23 changes: 16 additions & 7 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (n *DLQHandlerNode) ID() string {

// Run runs the DLQ handler node until all components depending on this node
// call Done. Dependents can be added or removed while the node is running.
func (n *DLQHandlerNode) Run(ctx context.Context) (err error) {
func (n *DLQHandlerNode) Run(ctx context.Context) error {
defer n.state.Set(nodeStateStopped)
n.window = newDLQWindow(n.WindowSize, n.WindowNackThreshold)

Expand All @@ -78,7 +78,7 @@ func (n *DLQHandlerNode) Run(ctx context.Context) (err error) {
handlerCtx, n.handlerCtxCancel = context.WithCancel(context.Background())
defer n.handlerCtxCancel()

err = n.Handler.Open(handlerCtx)
err := n.Handler.Open(handlerCtx)
if err != nil {
return cerrors.Errorf("could not open DLQ handler: %w", err)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (n *DLQHandlerNode) Ack(msg *Message) {
n.window.Ack()
}

func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err error) {
func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) error {
state, err := n.state.Watch(msg.Ctx,
csync.WatchValues(nodeStateRunning, nodeStateStopped, dlqHandlerNodeStateBroken))
if err != nil {
Expand All @@ -146,10 +146,19 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro

ok := n.window.Nack()
if !ok {
return cerrors.FatalError(cerrors.Errorf(
"DLQ nack threshold exceeded (%d/%d), original error: %w",
n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason,
))
if n.WindowNackThreshold > 0 {
// if the threshold is greater than 0 the DLQ is enabled and we
// need to respect the threshold by stopping the pipeline with a
// fatal error
return cerrors.FatalError(
cerrors.Errorf(
"DLQ nack threshold exceeded (%d/%d), original error: %w",
n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason,
),
)
}
// DLQ is disabled, we don't need to wrap the error message
return nackMetadata.Reason
}

defer func() {
Expand Down
10 changes: 8 additions & 2 deletions pkg/pipeline/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
// make sure that we ack as many records as possible
// (here we simply nack all of them, which is always only one)
if nackErr := msg.Nack(err, n.ID()); nackErr != nil {
return nackErr
return cerrors.FatalError(nackErr)
}
return err
return cerrors.FatalError(err)
}

switch v := recsOut[0].(type) {
Expand All @@ -114,6 +114,12 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
if err != nil {
return cerrors.FatalError(cerrors.Errorf("error executing processor: %w", err))
}
default:
err := cerrors.Errorf("processor returned unknown record type: %T", v)
if nackErr := msg.Nack(err, n.ID()); nackErr != nil {
return cerrors.FatalError(nackErr)
}
return cerrors.FatalError(err)
}
}
}
Expand Down

0 comments on commit 7e7bb8b

Please sign in to comment.