Skip to content

Commit

Permalink
Refactor trie update processing in Compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
fxamacker committed Jul 21, 2022
1 parent c053aaf commit 65f2062
Showing 1 changed file with 81 additions and 61 deletions.
142 changes: 81 additions & 61 deletions ledger/complete/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,79 +177,30 @@ Loop:
continue
}

// RecordUpdate returns the segment number the record was written to.
// Returned segment number (>= 0) can be
// - the same as previous segment number (same segment), or
// - incremented by 1 from previous segment number (new segment)
segmentNum, skipped, err := c.wal.RecordUpdate(update.Update)

if activeSegmentNum == -1 {
// Recover from failure to get active segment number outside loop
activeSegmentNum = segmentNum
if activeSegmentNum > nextCheckpointNum {
var checkpointNum int
var checkpointTries []*trie.MTrie
activeSegmentNum, checkpointNum, checkpointTries =
c.processTrieUpdate(update, activeSegmentNum, nextCheckpointNum)

if checkpointTries == nil {
// Don't checkpoint yet because
// - not enough segments for checkpointing (nextCheckpointNum >= activeSegmentNum), or
// - failed to get ledger state snapshop (nextCheckpointNum < activeSegmentNum)
if nextCheckpointNum < activeSegmentNum {
nextCheckpointNum = activeSegmentNum
}
}

if err != nil || skipped || segmentNum == activeSegmentNum {
processUpdateResult(update, err)
continue
}

// In the remaining code: segmentNum > activeSegmentNum

// active segment is finalized.

// Check new segment number is incremented by 1
if segmentNum != activeSegmentNum+1 {
c.logger.Error().Msg(fmt.Sprintf("compactor got unexpected new segment numer %d, want %d", segmentNum, activeSegmentNum+1))
}

// Update activeSegmentNum
prevSegmentNum := activeSegmentNum
activeSegmentNum = segmentNum

if nextCheckpointNum > prevSegmentNum {
// Not enough segments for checkpointing
processUpdateResult(update, nil)
continue
}

// In the remaining code: nextCheckpointNum == prevSegmentNum

// Enough segments are created for checkpointing

// Get ledger snapshot before sending WAL update result.
// At this point, ledger snapshot contains tries up to
// last update (logged as last record in finalized segment)
// Ledger doesn't include new trie for this update
// until WAL result is sent back.
tries, err := c.ledger.Tries()
if err != nil {
c.logger.Error().Err(err).Msg("compactor failed to get ledger tries")
// Try again after active segment is finalized.
nextCheckpointNum = activeSegmentNum

processUpdateResult(update, nil)
continue
}

// Send WAL update result after ledger state snapshot is taken.
processUpdateResult(update, nil)

// Try to checkpoint
if checkpointSem.TryAcquire(1) {

checkpointNum := nextCheckpointNum

// Compute next checkpoint number
nextCheckpointNum += int(c.checkpointDistance)
nextCheckpointNum = checkpointNum + int(c.checkpointDistance)

go func() {
defer checkpointSem.Release(1)

err := c.checkpoint(ctx, tries, checkpointNum)

err := c.checkpoint(ctx, checkpointTries, checkpointNum)
checkpointResultCh <- checkpointResult{checkpointNum, err}
}()
} else {
Expand Down Expand Up @@ -371,3 +322,72 @@ func cleanupCheckpoints(checkpointer *realWAL.Checkpointer, checkpointsToKeep in
}
return nil
}

// processTrieUpdate writes trie update to WAL, updates activeSegmentNum,
// and takes snapshot of ledger state for checkpointing if needed.
// It also sends WAL update result and waits for trie update completion.
func (c *Compactor) processTrieUpdate(
update *WALTrieUpdate,
activeSegmentNum int,
nextCheckpointNum int,
) (
_activeSegmentNum int,
checkpointNum int,
checkpointTries []*trie.MTrie,
) {

// RecordUpdate returns the segment number the record was written to.
// Returned segment number (>= 0) can be
// - the same as previous segment number (same segment), or
// - incremented by 1 from previous segment number (new segment)
segmentNum, skipped, updateErr := c.wal.RecordUpdate(update.Update)

// processUpdateResult must be called to ensure that ledger state update isn't blocked.
defer processUpdateResult(update, updateErr)

if activeSegmentNum == -1 {
// Recover from failure to get active segment number at initialization.
return segmentNum, -1, nil
}

if updateErr != nil || skipped || segmentNum == activeSegmentNum {
return activeSegmentNum, -1, nil
}

// In the remaining code: segmentNum > activeSegmentNum

// active segment is finalized.

// Check new segment number is incremented by 1
if segmentNum != activeSegmentNum+1 {
c.logger.Error().Msg(fmt.Sprintf("compactor got unexpected new segment numer %d, want %d", segmentNum, activeSegmentNum+1))
}

// Update activeSegmentNum
prevSegmentNum := activeSegmentNum
activeSegmentNum = segmentNum

if nextCheckpointNum > prevSegmentNum {
// Not enough segments for checkpointing
return activeSegmentNum, -1, nil
}

// In the remaining code: nextCheckpointNum == prevSegmentNum

// Enough segments are created for checkpointing

// Get ledger snapshot before sending WAL update result.
// At this point, ledger snapshot contains tries up to
// last update (logged as last record in finalized segment)
// Ledger doesn't include new trie for this update
// until WAL result is sent back.
tries, err := c.ledger.Tries()
if err != nil {
c.logger.Error().Err(err).Msg("compactor failed to get ledger tries")
return activeSegmentNum, -1, nil
}

checkpointNum = nextCheckpointNum

return activeSegmentNum, checkpointNum, tries
}

0 comments on commit 65f2062

Please sign in to comment.