Skip to content

Commit

Permalink
Make finality an enum rather than a bool
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Oct 25, 2023
1 parent f2b1a99 commit a7d2076
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 32 deletions.
2 changes: 1 addition & 1 deletion node/pkg/watchers/evm/connectors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type NewBlock struct {
Number *big.Int
Hash common.Hash
L1BlockNumber *big.Int // This is only populated on some chains (Arbitrum)
Safe bool
Finality FinalityLevel
}

// Connector exposes Wormhole-specific interactions with an EVM-based network
Expand Down
26 changes: 26 additions & 0 deletions node/pkg/watchers/evm/connectors/finality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package connectors

import (
"fmt"
)

type FinalityLevel uint8

const (
Latest FinalityLevel = iota
Safe
Finalized
)

func (f FinalityLevel) String() string {
if f == Latest {
return "Latest"
}
if f == Safe {
return "Safe"
}
if f == Finalized {
return "Finalized"
}
return fmt.Sprintf("unknown(%d)", f)
}
34 changes: 19 additions & 15 deletions node/pkg/watchers/evm/connectors/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error {
}

func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error {
lastBlock, err := b.getBlock(ctx, logger, nil, false)
lastBlock, err := b.getBlock(ctx, logger, nil, Finalized)
if err != nil {
return err
}

var lastSafeBlock *NewBlock
if b.publishSafeBlocks {
lastSafeBlock, err = b.getBlock(ctx, logger, nil, true)
lastSafeBlock, err = b.getBlock(ctx, logger, nil, Safe)
if err != nil {
return err
}
Expand All @@ -82,7 +82,7 @@ func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error
return ctx.Err()
case <-timer.C:
for count := 0; count < 3; count++ {
lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, false)
lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, Finalized)
if err == nil {
break
}
Expand All @@ -95,7 +95,7 @@ func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error

if err == nil && b.publishSafeBlocks {
for count := 0; count < 3; count++ {
lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, true)
lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, Safe)
if err == nil {
break
}
Expand All @@ -114,7 +114,7 @@ func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error
}
}

func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, safe bool) (lastPublishedBlock *NewBlock, retErr error) {
func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, desiredFinality FinalityLevel) (lastPublishedBlock *NewBlock, retErr error) {
// Some of the testnet providers (like the one we are using for Arbitrum) limit how many transactions we can do. When that happens, the call hangs.
// Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to
// investigate why the runable is dying and fix the underlying problem.
Expand All @@ -124,7 +124,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
// Fetch the latest block on the chain
// We could do this on every iteration such that if a new block is created while this function is being executed,
// it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration.
latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, safe)
latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, desiredFinality)
if err != nil {
logger.Error("failed to look up latest block",
zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err))
Expand All @@ -138,7 +138,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,

// Try to fetch the next block between lastBlock and latestBlock
nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1))
block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, safe)
block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, desiredFinality)
if err != nil {
logger.Error("failed to fetch next block",
zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err))
Expand All @@ -163,10 +163,10 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
return
}

func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, safe bool) (*NewBlock, error) {
func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.getBlock(timeout, logger, blockNumber, safe)
return b.getBlock(timeout, logger, blockNumber, desiredFinality)
}

func (b *BlockPollConnector) isBlockFinalizedWithTimeout(ctx context.Context, block *NewBlock) (bool, error) {
Expand Down Expand Up @@ -204,20 +204,24 @@ func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
return sub, nil
}

func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, safe bool) (*NewBlock, error) {
return getBlock(ctx, logger, b.Connector, number, b.useFinalized, safe)
func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) {
return getBlock(ctx, logger, b.Connector, number, b.useFinalized, desiredFinality)
}

// getBlock is a free function that can be called from other connectors to get a single block.
func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, safe bool) (*NewBlock, error) {
func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, desiredFinality FinalityLevel) (*NewBlock, error) {
var numStr string
if number != nil {
numStr = ethHexUtils.EncodeBig(number)
} else if useFinalized {
if safe {
if desiredFinality == Safe {
numStr = "safe"
} else {
} else if desiredFinality == Finalized {
numStr = "finalized"
} else if desiredFinality == Latest {
numStr = "latest"
} else {
panic("invalid finality level")
}
} else {
numStr = "latest"
Expand Down Expand Up @@ -248,6 +252,6 @@ func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *b
Number: &n,
Hash: m.Hash,
L1BlockNumber: l1bn,
Safe: safe,
Finality: desiredFinality,
}, nil
}
2 changes: 1 addition & 1 deletion node/pkg/watchers/evm/connectors/polygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *PolygonConnector) postBlock(ctx context.Context, blockNum *big.Int, sin
return fmt.Errorf("blockNum is nil")
}

block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false, false)
block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false, Finalized)
if err != nil {
return fmt.Errorf("failed to get block %s: %w", blockNum.String(), err)
}
Expand Down
30 changes: 15 additions & 15 deletions node/pkg/watchers/evm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,16 +653,16 @@ func (w *Watcher) Run(parentCtx context.Context) error {
continue
}
if ev.Number == nil {
logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Bool("is_safe_block", ev.Safe))
logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Stringer("finality", ev.Finality))
continue
}

start := time.Now()
currentHash := ev.Hash
logger.Debug("processing new header",
logger.Info("processing new header",
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.String("eth_network", w.networkName))
readiness.SetReady(w.readinessSync)
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
Expand All @@ -673,7 +673,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
w.pendingMu.Lock()

blockNumberU := ev.Number.Uint64()
if ev.Safe {
if ev.Finality == connectors.Safe {
atomic.StoreUint64(&w.latestSafeBlockNumber, blockNumberU)
} else {
atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
Expand All @@ -684,13 +684,13 @@ func (w *Watcher) Run(parentCtx context.Context) error {
// If this block is safe, only process messages wanting safe.
// If it's not safe, only process messages wanting finalized.
if w.safeBlocksSupported {
if ev.Safe != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) {
if (ev.Finality == connectors.Safe) != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) {
continue
}
}

var expectedConfirmations uint64
if w.waitForConfirmations && !ev.Safe {
if w.waitForConfirmations && ev.Finality != connectors.Safe {
expectedConfirmations = uint64(pLock.message.ConsistencyLevel)
}

Expand All @@ -702,7 +702,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName),
zap.Uint64("expectedConfirmations", expectedConfirmations),
Expand Down Expand Up @@ -734,7 +734,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName),
zap.Error(err))
Expand All @@ -753,7 +753,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName),
zap.Error(err))
Expand All @@ -770,7 +770,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName),
zap.Error(err))
Expand All @@ -787,7 +787,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName))
delete(w.pending, key)
Expand All @@ -801,7 +801,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Stringer("emitter_address", key.EmitterAddress),
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName))
delete(w.pending, key)
Expand All @@ -811,9 +811,9 @@ func (w *Watcher) Run(parentCtx context.Context) error {
}

w.pendingMu.Unlock()
logger.Debug("processed new header",
logger.Info("processed new header",
zap.Stringer("current_block", ev.Number),
zap.Bool("is_safe_block", ev.Safe),
zap.Stringer("finality", ev.Finality),
zap.Stringer("current_blockhash", currentHash),
zap.Duration("took", time.Since(start)),
zap.String("eth_network", w.networkName))
Expand All @@ -839,7 +839,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet(
ethConn connectors.Connector,
) error {
msm := time.Now()
logger.Debug("fetching guardian set")
logger.Info("fetching guardian set")
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn)
Expand Down

0 comments on commit a7d2076

Please sign in to comment.