Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node/EVM: Track latest block #3470

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,19 +1045,17 @@ func runNode(cmd *cobra.Command, args []string) {
}

if shouldStart(polygonRPC) {
// Checkpointing is required in mainnet, so we don't need to wait for confirmations.
waitForConfirmations := *unsafeDevMode || *testnetMode
if !waitForConfirmations && *polygonRootChainRpc == "" {
log.Fatal("Polygon checkpointing is required in mainnet")
// Checkpointing is required in mainnet and testnet.
if !*unsafeDevMode && *polygonRootChainRpc == "" {
log.Fatal("Polygon checkpointing is required in mainnet and testnet")
}
wc := &evm.WatcherConfig{
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
WaitForConfirmations: waitForConfirmations,
RootChainRpc: *polygonRootChainRpc,
RootChainContract: *polygonRootChainContractAddress,
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
RootChainRpc: *polygonRootChainRpc,
RootChainContract: *polygonRootChainContractAddress,
}

watcherConfigs = append(watcherConfigs, wc)
Expand Down
2 changes: 1 addition & 1 deletion node/hack/parse_eth_tx/parse_eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
log.Fatalf("dialing eth client failed: %v", err)
}
} else {
ethIntf, err = connectors.NewEthereumConnector(ctx, "", *flagEthRPC, contractAddr, zap.L())
ethIntf, err = connectors.NewEthereumBaseConnector(ctx, "", *flagEthRPC, contractAddr, zap.L())
if err != nil {
log.Fatalf("dialing eth client failed: %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions node/pkg/adminrpc/adminserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (m mockEVMConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.Ba
panic("unimplemented")
}

func (c mockEVMConnector) Client() *ethclient.Client {
panic("unimplemented")
}

func generateGS(num int) (keys []*ecdsa.PrivateKey, addrs []common.Address) {
for i := 0; i < num; i++ {
key, err := ethcrypto.GenerateKey()
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/adminServiceRunnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func adminServiceRunnable(
var evmConnector connectors.Connector
if ethRpc != nil && ethContract != nil {
contract := ethcommon.HexToAddress(*ethContract)
evmConnector, err = connectors.NewEthereumConnector(ctx, "eth", *ethRpc, contract, logger)
evmConnector, err = connectors.NewEthereumBaseConnector(ctx, "eth", *ethRpc, contract, logger)
if err != nil {
return nil, fmt.Errorf("failed to connecto to ethereum")
}
Expand Down
329 changes: 176 additions & 153 deletions node/pkg/proto/gossip/v1/gossip.pb.go

Large diffs are not rendered by default.

286 changes: 286 additions & 0 deletions node/pkg/watchers/evm/connectors/batch_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package connectors

import (
"context"
"fmt"
"math/big"
"time"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethEvent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"

ethereum "github.com/ethereum/go-ethereum"

"go.uber.org/zap"
)

// BatchPollConnector uses batch requests to poll for latest, safe and finalized blocks.
type BatchPollConnector struct {
Connector
Delay time.Duration
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
batchData []BatchEntry
}

type (
Blocks []*NewBlock

BatchEntry struct {
tag string
finality FinalityLevel
}

BatchResult struct {
result BlockMarshaller
err error
}
)

const MAX_GAP_BATCH_SIZE uint64 = 5

func NewBatchPollConnector(ctx context.Context, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) {
// Create the batch data in the order we want to report them to the watcher, so finalized is most important, latest is least.
batchData := []BatchEntry{
{tag: "finalized", finality: Finalized},
{tag: "safe", finality: Safe},
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
{tag: "latest", finality: Latest},
}

connector := &BatchPollConnector{
Connector: baseConnector,
Delay: delay,
batchData: batchData,
}
err := supervisor.Run(ctx, "batchPoller", common.WrapWithScissors(connector.runFromSupervisor, "batchPoller"))
if err != nil {
return nil, err
}
return connector, nil
}

func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)

// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed. The feed library can't handle interfaces which is why we post strings.
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)

common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
return nil
case <-sub.quit:
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
})
return sub, nil
}

func (b *BatchPollConnector) runFromSupervisor(ctx context.Context) error {
logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName()))
supervisor.Signal(ctx, supervisor.SignalHealthy)
return b.run(ctx, logger)
}

func (b *BatchPollConnector) run(ctx context.Context, logger *zap.Logger) error {
// Get the initial blocks.
lastBlocks, err := b.getBlocks(ctx, logger)
if err != nil {
return err
}

timer := time.NewTimer(b.Delay)
defer timer.Stop()

errCount := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
lastBlocks, err = b.pollBlocks(ctx, logger, lastBlocks)
if err != nil {
errCount++
logger.Error("batch polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
if errCount > 3 {
b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err))
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
errCount = 0
}
} else {
errCount = 0
}

timer.Reset(b.Delay)
}
}
}

// pollBlocks polls for the latest blocks (finalized, safe and latest), compares them to the last ones, and publishes any new ones.
// In the case of an error, it returns the last blocks that were passed in, otherwise it returns the new blocks.
func (b *BatchPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, prevBlocks Blocks) (Blocks, error) {
newBlocks, err := b.getBlocks(ctx, logger)
if err != nil {
return prevBlocks, err
}

if len(newBlocks) != len(prevBlocks) {
panic(fmt.Sprintf("getBlocks returned %d entries when there should be %d", len(newBlocks), len(prevBlocks)))
}

for idx, newBlock := range newBlocks {
if newBlock.Number.Cmp(prevBlocks[idx].Number) > 0 {
// If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches.
newBlockNum := newBlock.Number.Uint64()
blockNum := prevBlocks[idx].Number.Uint64() + 1
for blockNum < newBlockNum {
batchSize := newBlockNum - blockNum
if batchSize > MAX_GAP_BATCH_SIZE {
batchSize = MAX_GAP_BATCH_SIZE
}
gapBlocks, err := b.getBlockRange(ctx, logger, blockNum, batchSize, b.batchData[idx].finality)
if err != nil {
return prevBlocks, fmt.Errorf("failed to get gap blocks: %w", err)
}
for _, block := range gapBlocks {
b.blockFeed.Send(block)
}
blockNum += batchSize
}

b.blockFeed.Send(newBlock)
} else if newBlock.Number.Cmp(prevBlocks[idx].Number) < 0 {
logger.Warn("latest block number went backwards, ignoring it", zap.Any("newLatest", newBlock.Number), zap.Any("prevLatest", prevBlocks[idx].Number))
newBlocks[idx] = prevBlocks[idx]
}
}

return newBlocks, nil
}

// getBlocks gets the current batch of configured blocks (finalized, safe, latest).
func (b *BatchPollConnector) getBlocks(ctx context.Context, logger *zap.Logger) (Blocks, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

batch := make([]rpc.BatchElem, len(b.batchData))
results := make([]BatchResult, len(b.batchData))
for idx, bd := range b.batchData {
batch[idx] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
bd.tag,
false, // no full transaction details
},
Result: &results[idx].result,
Error: results[idx].err,
}
}

err := b.Connector.RawBatchCallContext(timeout, batch)
if err != nil {
logger.Error("failed to get blocks", zap.Error(err))
return nil, err
}

ret := make(Blocks, len(b.batchData))
for idx, result := range results {
finality := b.batchData[idx].finality
if result.err != nil {
logger.Error("failed to get block", zap.Stringer("finality", finality), zap.Error(result.err))
return nil, err
}

m := &result.result
if m.Number == nil {
logger.Error("failed to unmarshal block: Number is nil", zap.Stringer("finality", finality), zap.String("tag", b.batchData[idx].tag))
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
n := big.Int(*m.Number)

var l1bn *big.Int
if m.L1BlockNumber != nil {
bn := big.Int(*m.L1BlockNumber)
l1bn = &bn
}

ret[idx] = &NewBlock{
Number: &n,
Hash: m.Hash,
L1BlockNumber: l1bn,
Finality: finality,
}
}

return ret, nil
}

// getBlockRange gets a range of blocks, starting at blockNum, including the next numBlocks. It passes back an array of those blocks.
func (b *BatchPollConnector) getBlockRange(ctx context.Context, logger *zap.Logger, blockNum uint64, numBlocks uint64, finality FinalityLevel) (Blocks, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

batch := make([]rpc.BatchElem, numBlocks)
results := make([]BatchResult, numBlocks)
for idx := 0; idx < int(numBlocks); idx++ {
batch[idx] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
"0x" + fmt.Sprintf("%x", blockNum),
false, // no full transaction details
},
Result: &results[idx].result,
Error: results[idx].err,
}
blockNum++
}

err := b.Connector.RawBatchCallContext(timeout, batch)
if err != nil {
logger.Error("failed to get blocks", zap.Error(err))
return nil, err
}

ret := make(Blocks, numBlocks)
for idx, result := range results {
if result.err != nil {
logger.Error("failed to get block", zap.Int("idx", idx), zap.Stringer("finality", finality), zap.Error(result.err))
return nil, err
}

m := &result.result
if m.Number == nil {
logger.Error("failed to unmarshal block: Number is nil")
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
n := big.Int(*m.Number)

var l1bn *big.Int
if m.L1BlockNumber != nil {
bn := big.Int(*m.L1BlockNumber)
l1bn = &bn
}

ret[idx] = &NewBlock{
Number: &n,
Hash: m.Hash,
L1BlockNumber: l1bn,
Finality: finality,
}
}

return ret, nil
}
Loading
Loading