Skip to content

Commit

Permalink
Only retry checkpointing on appropriate error
Browse files Browse the repository at this point in the history
Asynchronous checkpointing can return createCheckpointError and
removeCheckpointError.

Only retry checkpointing if createCheckpointError is returned.

Log and ignore removeCheckpointError.
  • Loading branch information
fxamacker committed Jul 31, 2022
1 parent 9f2ec46 commit 22eeb3e
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions ledger/complete/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ Loop:

case checkpointResult := <-checkpointResultCh:
if checkpointResult.err != nil {
c.logger.Error().Err(checkpointResult.err).Msgf(
"compactor failed to checkpoint %d", checkpointResult.num,
c.logger.Error().Err(checkpointResult.err).Msg(
"compactor failed to create or remove checkpoint",
)

// Retry checkpointing when active segment is finalized.
nextCheckpointNum = activeSegmentNum
var createError *createCheckpointError
if errors.As(checkpointResult.err, &createError) {
// Retry checkpointing when active segment is finalized.
nextCheckpointNum = activeSegmentNum
}
}

case update, ok := <-c.trieUpdateCh:
Expand Down Expand Up @@ -271,7 +273,7 @@ func (c *Compactor) checkpoint(ctx context.Context, tries []*trie.MTrie, checkpo

err := createCheckpoint(c.checkpointer, c.logger, tries, checkpointNum)
if err != nil {
return fmt.Errorf("cannot create checkpoints: %w", err)
return &createCheckpointError{num: checkpointNum, err: err}
}

// Return if context is canceled.
Expand All @@ -283,7 +285,7 @@ func (c *Compactor) checkpoint(ctx context.Context, tries []*trie.MTrie, checkpo

err = cleanupCheckpoints(c.checkpointer, int(c.checkpointsToKeep))
if err != nil {
return fmt.Errorf("cannot cleanup checkpoints: %w", err)
return &removeCheckpointError{err: err}
}

if checkpointNum > 0 {
Expand Down Expand Up @@ -438,3 +440,26 @@ func (c *Compactor) processTrieUpdate(

return activeSegmentNum, checkpointNum, tries
}

// createCheckpointError creates a checkpoint creation error.
type createCheckpointError struct {
num int
err error
}

func (e *createCheckpointError) Error() string {
return fmt.Sprintf("cannot create checkpoint %d: %s", e.num, e.err)
}

func (e *createCheckpointError) Unwrap() error { return e.err }

// removeCheckpointError creates a checkpoint removal error.
type removeCheckpointError struct {
err error
}

func (e *removeCheckpointError) Error() string {
return fmt.Sprintf("cannot cleanup checkpoints: %s", e.err)
}

func (e *removeCheckpointError) Unwrap() error { return e.err }

0 comments on commit 22eeb3e

Please sign in to comment.