Skip to content

Commit

Permalink
Replay events during restart to avoid tx missing (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
yzang2019 authored Mar 15, 2024
1 parent 4269298 commit 66ac407
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
22 changes: 21 additions & 1 deletion internal/consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (h *Handshaker) ReplayBlocks(
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
// We're good! But we need to reindex events
err := h.replayEvents(appBlockHeight)
if err != nil {
return nil, err
}
if err := checkAppHashEqualsOneFromState(appHash, state); err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,6 +554,22 @@ func (h *Handshaker) replayBlock(
return state, nil
}

// replayEvents will be called during restart to avoid tx missing to be indexed
func (h *Handshaker) replayEvents(height int64) error {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
res, err := h.stateStore.LoadFinalizeBlockResponses(height)
if err != nil {
return err
}
validatorUpdates, err := types.PB2TM.ValidatorUpdates(res.ValidatorUpdates)
if err != nil {
return err
}
sm.FireEvents(h.logger, h.eventBus, block, meta.BlockID, res, validatorUpdates)
return nil
}

func checkAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) error {
if !bytes.Equal(appHash, block.AppHash) {
return fmt.Errorf(`block.AppHash does not match AppHash after replay. Got '%X', expected '%X'.
Expand Down
6 changes: 3 additions & 3 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (blockExec *BlockExecutor) ApplyBlock(

// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)

return state, nil
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func (state State) Update(
// Fire NewBlock, NewBlockHeader.
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(
func FireEvents(
logger log.Logger,
eventBus types.BlockEventPublisher,
block *types.Block,
Expand Down Expand Up @@ -811,7 +811,7 @@ func ExecCommitBlock(
}

blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
FireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
}

// Commit block
Expand Down

0 comments on commit 66ac407

Please sign in to comment.