Skip to content

Commit

Permalink
clarify error message on cursor resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Oct 11, 2022
1 parent 7a027bf commit 52a3776
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
7 changes: 5 additions & 2 deletions cursor_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bstream
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
Expand All @@ -11,6 +12,8 @@ import (
"go.uber.org/zap"
)

var ErrResolveCursor = errors.New("cannot resolve cursor")

// cursorResolver is a handler that feeds from a source of new+irreversible blocks (filesource)
// and keeps blocks in a slice until cursor is passed.
// when it sees the cursor, it sends whatever is needed to bring the consumer back to a "new and irreversible" head
Expand Down Expand Up @@ -177,11 +180,11 @@ func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*Block, cont

forkedBlock := oneBlocks[previousID]
if forkedBlock == nil {
return nil, 0, fmt.Errorf("cannot resolve cursor pointing to block %s: missing link: no one-block-file or merged block found with ID %s", block, previousID)
return nil, 0, fmt.Errorf("%w: missing link between blocks %d and %s: no forked-block file found with ID ending with %s.", ErrResolveCursor, lib.Num(), block, previousID)
}

if forkedBlock.Num < lib.Num() {
return nil, 0, fmt.Errorf("cannot resolve cursor pointing to block %s: missing link: forked chain goes beyond LIB, looking for ID %s (this should not happens)", block, previousID)
return nil, 0, fmt.Errorf("%w: block %s not linkable to canonical chain above final block %d (looking for ID ending with %s)", ErrResolveCursor, block, lib.Num(), previousID)
}

previousID = forkedBlock.PreviousID
Expand Down
2 changes: 1 addition & 1 deletion filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (s *FileSource) launchSink() {
}

if err := s.handler.ProcessBlock(preBlock.Block, preBlock.Obj); err != nil {
s.Shutdown(fmt.Errorf("process block failed: %w", err))
s.Shutdown(err)
return
}
if s.highestFileProcessedBlock != nil && preBlock.Num() > s.highestFileProcessedBlock.Num() {
Expand Down
4 changes: 4 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"context"
"errors"
"fmt"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -92,6 +93,9 @@ func (s *Stream) Run(ctx context.Context) error {
source.Run()
if err := source.Err(); err != nil {
s.logger.Debug("source shutting down", zap.Error(err))
if errors.Is(err, bstream.ErrResolveCursor) {
return &ErrInvalidArg{message: err.Error()}
}
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *preMergeBlockSource) Run() {
s.Shutdown(err)
}
err = s.handler.ProcessBlock(nativeBlock, nil)
s.Shutdown(fmt.Errorf("process block failed: %w", err))
s.Shutdown(err)
}

s.Shutdown(nil)
Expand Down

0 comments on commit 52a3776

Please sign in to comment.