-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/scribe chain backfill #146
Changes from 4 commits
83cfb26
786b32a
410c38d
96018be
bc3803b
6414c62
0409f3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package backfill | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/synapsecns/sanguine/services/scribe/config" | ||
"github.com/synapsecns/sanguine/services/scribe/db" | ||
"github.com/synapsecns/synapse-node/pkg/evm/client" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// ChainBackfiller is a backfiller that fetches logs for a chain. It aggregates logs | ||
// from a slice of ContractBackfillers. | ||
type ChainBackfiller struct { | ||
// eventDB is the database to store event data in | ||
eventDB db.EventDB | ||
// client is the client for filtering | ||
client client.EVMClient | ||
// contractBackfillers is the list of contract backfillers | ||
contractBackfillers []*ContractBackfiller | ||
// chainConfig is the config for the backfiller | ||
chainConfig config.ChainConfig | ||
} | ||
|
||
// NewChainBackfiller creates a new backfiller for a chain. | ||
func NewChainBackfiller(eventDB db.EventDB, client client.EVMClient, chainConfig config.ChainConfig) (*ChainBackfiller, error) { | ||
// initialize the list of contract backfillers | ||
contractBackfillers := []*ContractBackfiller{} | ||
// initialize each contract backfiller | ||
for name, contract := range chainConfig.Contracts { | ||
contractBackfiller, err := NewContractBackfiller(name, chainConfig.ChainID, contract.Address, eventDB, client) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not create contract backfiller: %w", err) | ||
} | ||
contractBackfillers = append(contractBackfillers, contractBackfiller) | ||
} | ||
|
||
return &ChainBackfiller{ | ||
eventDB: eventDB, | ||
client: client, | ||
contractBackfillers: contractBackfillers, | ||
chainConfig: chainConfig, | ||
}, nil | ||
} | ||
|
||
// Backfill iterates over each contract backfiller and calls Backfill concurrently on each one. | ||
func (c ChainBackfiller) Backfill(ctx context.Context, endHeight uint64) error { | ||
// initialize the errgroup | ||
g, ctx := errgroup.WithContext(ctx) | ||
// iterate over each contract backfiller | ||
for _, contractBackfiller := range c.contractBackfillers { | ||
// capture func literal | ||
contractBackfiller := contractBackfiller | ||
// get the start height for the backfill | ||
startHeight := c.chainConfig.Contracts[contractBackfiller.mapName].StartBlock | ||
// call Backfill concurrently | ||
g.Go(func() error { | ||
err := contractBackfiller.Backfill(ctx, startHeight, endHeight) | ||
if err != nil { | ||
return fmt.Errorf("could not backfill contract: %w", err) | ||
} | ||
return nil | ||
}) | ||
} | ||
// wait for all of the backfillers to finish | ||
if err := g.Wait(); err != nil { | ||
return fmt.Errorf("could not backfill: %w", err) | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
package backfill_test | ||
|
||
import ( | ||
"math/big" | ||
|
||
"github.com/ethereum/go-ethereum/params" | ||
. "github.com/stretchr/testify/assert" | ||
"github.com/synapsecns/sanguine/agents/contracts/testcontract" | ||
"github.com/synapsecns/sanguine/agents/testutil" | ||
"github.com/synapsecns/sanguine/ethergo/backends/simulated" | ||
"github.com/synapsecns/sanguine/ethergo/contracts" | ||
"github.com/synapsecns/sanguine/services/scribe/backfill" | ||
"github.com/synapsecns/sanguine/services/scribe/config" | ||
) | ||
|
||
// TestConfirmations tests that data will not be added if a specified amount of blocks | ||
// have not passed before the block that the data belongs to. | ||
func (b BackfillSuite) TestConfirmations() { | ||
// Get simulated blockchain, deploy three test contracts, and set up test variables. | ||
simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetTestContext(), b.T(), big.NewInt(4)) | ||
simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) | ||
testContract, testRef := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) | ||
// Create a second test contract just meant to pass blocks. | ||
dummyManager := testutil.NewDeployManager(b.T()) | ||
_, dummyRef := dummyManager.GetTestContract(b.GetTestContext(), simulatedChain) | ||
transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) | ||
// Set up the config. | ||
deployTxHash := testContract.DeployTx().Hash() | ||
receipt, err := simulatedChain.TransactionReceipt(b.GetTestContext(), deployTxHash) | ||
Nil(b.T(), err) | ||
startBlock := receipt.BlockNumber.Uint64() | ||
contractConfigs := make(config.ContractConfigs) | ||
contractConfigs["TestContract"] = config.ContractConfig{ | ||
Address: testContract.Address().String(), | ||
StartBlock: startBlock, | ||
} | ||
chainConfig := config.ChainConfig{ | ||
ChainID: 4, | ||
RPCUrl: "an rpc url is not needed for simulated backends", | ||
ConfirmationThreshold: 2, | ||
Contracts: contractConfigs, | ||
} | ||
|
||
// Set up the ChainBackfiller. | ||
chainBackfiller, err := backfill.NewChainBackfiller(b.testDB, simulatedChain, chainConfig) | ||
Nil(b.T(), err) | ||
|
||
// Emit three events from two transactions. | ||
tx, err := testRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
tx, err = testRef.EmitEventAandB(transactOpts.TransactOpts, big.NewInt(4), big.NewInt(5), big.NewInt(6)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
|
||
// Use the dummy contract to pass two blocks. | ||
tx, err = dummyRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
tx, err = dummyRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
|
||
// Backfill the first batch of events. | ||
latestBlock, err := simulatedChain.BlockNumber(b.GetTestContext()) | ||
Nil(b.T(), err) | ||
err = chainBackfiller.Backfill(b.GetTestContext(), latestBlock-uint64(chainConfig.ConfirmationThreshold)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. setting the endHeight of the backfiller will be the job of the |
||
Nil(b.T(), err) | ||
|
||
// Check that the first batch of events were added to the database. | ||
logs, err := b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, testContract.Address()) | ||
Nil(b.T(), err) | ||
Equal(b.T(), 3, len(logs)) | ||
|
||
receipts, err := b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) | ||
Nil(b.T(), err) | ||
Equal(b.T(), 2, len(receipts)) | ||
|
||
// Send one more transaction. | ||
tx, err = testRef.EmitEventB(transactOpts.TransactOpts, []byte{7}, big.NewInt(8), big.NewInt(9)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
|
||
// Backfill before the confirmation threshold has passed. | ||
latestBlock, err = simulatedChain.BlockNumber(b.GetTestContext()) | ||
Nil(b.T(), err) | ||
err = chainBackfiller.Backfill(b.GetTestContext(), latestBlock-uint64(chainConfig.ConfirmationThreshold)) | ||
Nil(b.T(), err) | ||
|
||
// Check that the second batch of events were not added to the database. | ||
logs, err = b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, testContract.Address()) | ||
Nil(b.T(), err) | ||
Equal(b.T(), 3, len(logs)) | ||
|
||
receipts, err = b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) | ||
Nil(b.T(), err) | ||
Equal(b.T(), 2, len(receipts)) | ||
} | ||
|
||
// TestChainBackfill tests the ChainBackfiller's ability to backfill a chain. | ||
func (b BackfillSuite) TestChainBackfill() { | ||
// We need to set up multiple deploy managers, one for each contract. We will use | ||
// b.manager for the first contract, and create a new ones for the next two. | ||
managerB := testutil.NewDeployManager(b.T()) | ||
managerC := testutil.NewDeployManager(b.T()) | ||
// Get simulated blockchain, deploy three test contracts, and set up test variables. | ||
simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetTestContext(), b.T(), big.NewInt(1)) | ||
simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) | ||
testContractA, testRefA := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) | ||
testContractB, testRefB := managerB.GetTestContract(b.GetTestContext(), simulatedChain) | ||
testContractC, testRefC := managerC.GetTestContract(b.GetTestContext(), simulatedChain) | ||
transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) | ||
// Put the contracts into a slice so we can iterate over them. | ||
contracts := []contracts.DeployedContract{testContractA, testContractB, testContractC} | ||
// Put the test refs into a slice so we can iterate over them. | ||
testRefs := []*testcontract.TestContractRef{testRefA, testRefB, testRefC} | ||
// Emit events from each contract. | ||
for _, testRef := range testRefs { | ||
tx, err := testRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
tx, err = testRef.EmitEventB(transactOpts.TransactOpts, []byte{4}, big.NewInt(5), big.NewInt(6)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
tx, err = testRef.EmitEventAandB(transactOpts.TransactOpts, big.NewInt(7), big.NewInt(8), big.NewInt(9)) | ||
Nil(b.T(), err) | ||
simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) | ||
} | ||
|
||
startBlocks := make([]uint64, len(contracts)) | ||
for i, contract := range contracts { | ||
deployTxHash := contract.DeployTx().Hash() | ||
receipt, err := simulatedChain.TransactionReceipt(b.GetTestContext(), deployTxHash) | ||
Nil(b.T(), err) | ||
startBlocks[i] = receipt.BlockNumber.Uint64() | ||
} | ||
// Set up the ChainConfig for the backfiller. | ||
contractConfigs := make(config.ContractConfigs) | ||
contractConfigs["TestContractA"] = config.ContractConfig{ | ||
Address: testContractA.Address().String(), | ||
StartBlock: startBlocks[0], | ||
} | ||
contractConfigs["TestContractB"] = config.ContractConfig{ | ||
Address: testContractB.Address().String(), | ||
StartBlock: startBlocks[1], | ||
} | ||
contractConfigs["TestContractC"] = config.ContractConfig{ | ||
Address: testContractC.Address().String(), | ||
StartBlock: startBlocks[2], | ||
} | ||
chainConfig := config.ChainConfig{ | ||
ChainID: 1, | ||
RPCUrl: "an rpc url is not needed for simulated backends", | ||
ConfirmationThreshold: 0, | ||
Contracts: contractConfigs, | ||
} | ||
|
||
// Set up the ChainBackfiller. | ||
chainBackfiller, err := backfill.NewChainBackfiller(b.testDB, simulatedChain, chainConfig) | ||
Nil(b.T(), err) | ||
// Backfill the chain. | ||
lastBlock, err := simulatedChain.BlockNumber(b.GetTestContext()) | ||
Nil(b.T(), err) | ||
err = chainBackfiller.Backfill(b.GetTestContext(), lastBlock) | ||
Nil(b.T(), err) | ||
|
||
// Check that the events were written to the database. | ||
for _, contract := range contracts { | ||
// Check the storage of logs. | ||
logs, err := b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, contract.Address()) | ||
Nil(b.T(), err) | ||
// There should be 4 logs. One from `EmitEventA`, one from `EmitEventB`, and two | ||
// from `EmitEventAandB`. | ||
Equal(b.T(), 4, len(logs)) | ||
} | ||
// Check the storage of receipts. | ||
receipts, err := b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) | ||
Nil(b.T(), err) | ||
// There should be 9 receipts. One from `EmitEventA`, one from `EmitEventB`, and | ||
// one from `EmitEventAandB`, for each contract. | ||
Equal(b.T(), 9, len(receipts)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not sure about this error handling behavior here. An error on a contract in a given chain shouldn't throw an error on every contract on that chain. Maybe we need to throw this in a for-loop w/ a backoff? If you want to handle in a different pr can definitely make an issue and handle later, but definitely needs to be addressed
One thing to remember here is RPCs can be super flaky. Rate limits, data-availability issues, etc can all cause any of our networked calls (
eth_getLogs
,eth_getTransactionByHash
,eth_getReceipt
) to fail so we want to design these individual loops to be resilient.The goal here is to keep these things indexing as long as we can hit our most important gurantee- that we're not marking a range as indexed if it's not indexed