diff --git a/services/scribe/backfill/chain.go b/services/scribe/backfill/chain.go new file mode 100644 index 0000000000..03570007cf --- /dev/null +++ b/services/scribe/backfill/chain.go @@ -0,0 +1,72 @@ +package backfill + +import ( + "context" + "fmt" + + "github.com/synapsecns/sanguine/services/scribe/config" + "github.com/synapsecns/sanguine/services/scribe/db" + "github.com/synapsecns/synapse-node/pkg/evm/client" + "golang.org/x/sync/errgroup" +) + +// ChainBackfiller is a backfiller that fetches logs for a chain. It aggregates logs +// from a slice of ContractBackfillers. +type ChainBackfiller struct { + // eventDB is the database to store event data in + eventDB db.EventDB + // client is the client for filtering + client client.EVMClient + // contractBackfillers is the list of contract backfillers + contractBackfillers []*ContractBackfiller + // chainConfig is the config for the backfiller + chainConfig config.ChainConfig +} + +// NewChainBackfiller creates a new backfiller for a chain. +func NewChainBackfiller(eventDB db.EventDB, client client.EVMClient, chainConfig config.ChainConfig) (*ChainBackfiller, error) { + // initialize the list of contract backfillers + contractBackfillers := []*ContractBackfiller{} + // initialize each contract backfiller + for name, contract := range chainConfig.Contracts { + contractBackfiller, err := NewContractBackfiller(name, chainConfig.ChainID, contract.Address, eventDB, client) + if err != nil { + return nil, fmt.Errorf("could not create contract backfiller: %w", err) + } + contractBackfillers = append(contractBackfillers, contractBackfiller) + } + + return &ChainBackfiller{ + eventDB: eventDB, + client: client, + contractBackfillers: contractBackfillers, + chainConfig: chainConfig, + }, nil +} + +// Backfill iterates over each contract backfiller and calls Backfill concurrently on each one. +func (c ChainBackfiller) Backfill(ctx context.Context, endHeight uint64) error { + // initialize the errgroup + g, ctx := errgroup.WithContext(ctx) + // iterate over each contract backfiller + for _, contractBackfiller := range c.contractBackfillers { + // capture func literal + contractBackfiller := contractBackfiller + // get the start height for the backfill + startHeight := c.chainConfig.Contracts[contractBackfiller.mapName].StartBlock + // call Backfill concurrently + g.Go(func() error { + err := contractBackfiller.Backfill(ctx, startHeight, endHeight) + if err != nil { + return fmt.Errorf("could not backfill contract: %w", err) + } + return nil + }) + } + // wait for all of the backfillers to finish + if err := g.Wait(); err != nil { + return fmt.Errorf("could not backfill: %w", err) + } + + return nil +} diff --git a/services/scribe/backfill/chain_test.go b/services/scribe/backfill/chain_test.go new file mode 100644 index 0000000000..3c74e76adb --- /dev/null +++ b/services/scribe/backfill/chain_test.go @@ -0,0 +1,182 @@ +package backfill_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/params" + . "github.com/stretchr/testify/assert" + "github.com/synapsecns/sanguine/agents/contracts/testcontract" + "github.com/synapsecns/sanguine/agents/testutil" + "github.com/synapsecns/sanguine/ethergo/backends/simulated" + "github.com/synapsecns/sanguine/ethergo/contracts" + "github.com/synapsecns/sanguine/services/scribe/backfill" + "github.com/synapsecns/sanguine/services/scribe/config" +) + +// TestConfirmations tests that data will not be added if a specified amount of blocks +// have not passed before the block that the data belongs to. +func (b BackfillSuite) TestConfirmations() { + // Get simulated blockchain, deploy three test contracts, and set up test variables. + simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetTestContext(), b.T(), big.NewInt(4)) + simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) + testContract, testRef := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) + // Create a second test contract just meant to pass blocks. + dummyManager := testutil.NewDeployManager(b.T()) + _, dummyRef := dummyManager.GetTestContract(b.GetTestContext(), simulatedChain) + transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) + // Set up the config. + deployTxHash := testContract.DeployTx().Hash() + receipt, err := simulatedChain.TransactionReceipt(b.GetTestContext(), deployTxHash) + Nil(b.T(), err) + startBlock := receipt.BlockNumber.Uint64() + contractConfigs := make(config.ContractConfigs) + contractConfigs["TestContract"] = config.ContractConfig{ + Address: testContract.Address().String(), + StartBlock: startBlock, + } + chainConfig := config.ChainConfig{ + ChainID: 4, + RPCUrl: "an rpc url is not needed for simulated backends", + ConfirmationThreshold: 2, + Contracts: contractConfigs, + } + + // Set up the ChainBackfiller. + chainBackfiller, err := backfill.NewChainBackfiller(b.testDB, simulatedChain, chainConfig) + Nil(b.T(), err) + + // Emit three events from two transactions. + tx, err := testRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + tx, err = testRef.EmitEventAandB(transactOpts.TransactOpts, big.NewInt(4), big.NewInt(5), big.NewInt(6)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + + // Use the dummy contract to pass two blocks. + tx, err = dummyRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + tx, err = dummyRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + + // Backfill the first batch of events. + latestBlock, err := simulatedChain.BlockNumber(b.GetTestContext()) + Nil(b.T(), err) + err = chainBackfiller.Backfill(b.GetTestContext(), latestBlock-uint64(chainConfig.ConfirmationThreshold)) + Nil(b.T(), err) + + // Check that the first batch of events were added to the database. + logs, err := b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, testContract.Address()) + Nil(b.T(), err) + Equal(b.T(), 3, len(logs)) + + receipts, err := b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) + Nil(b.T(), err) + Equal(b.T(), 2, len(receipts)) + + // Send one more transaction. + tx, err = testRef.EmitEventB(transactOpts.TransactOpts, []byte{7}, big.NewInt(8), big.NewInt(9)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + + // Backfill before the confirmation threshold has passed. + latestBlock, err = simulatedChain.BlockNumber(b.GetTestContext()) + Nil(b.T(), err) + err = chainBackfiller.Backfill(b.GetTestContext(), latestBlock-uint64(chainConfig.ConfirmationThreshold)) + Nil(b.T(), err) + + // Check that the second batch of events were not added to the database. + logs, err = b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, testContract.Address()) + Nil(b.T(), err) + Equal(b.T(), 3, len(logs)) + + receipts, err = b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) + Nil(b.T(), err) + Equal(b.T(), 2, len(receipts)) +} + +// TestChainBackfill tests the ChainBackfiller's ability to backfill a chain. +func (b BackfillSuite) TestChainBackfill() { + // We need to set up multiple deploy managers, one for each contract. We will use + // b.manager for the first contract, and create a new ones for the next two. + managerB := testutil.NewDeployManager(b.T()) + managerC := testutil.NewDeployManager(b.T()) + // Get simulated blockchain, deploy three test contracts, and set up test variables. + simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetTestContext(), b.T(), big.NewInt(1)) + simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) + testContractA, testRefA := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) + testContractB, testRefB := managerB.GetTestContract(b.GetTestContext(), simulatedChain) + testContractC, testRefC := managerC.GetTestContract(b.GetTestContext(), simulatedChain) + transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) + // Put the contracts into a slice so we can iterate over them. + contracts := []contracts.DeployedContract{testContractA, testContractB, testContractC} + // Put the test refs into a slice so we can iterate over them. + testRefs := []*testcontract.TestContractRef{testRefA, testRefB, testRefC} + // Emit events from each contract. + for _, testRef := range testRefs { + tx, err := testRef.EmitEventA(transactOpts.TransactOpts, big.NewInt(1), big.NewInt(2), big.NewInt(3)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + tx, err = testRef.EmitEventB(transactOpts.TransactOpts, []byte{4}, big.NewInt(5), big.NewInt(6)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + tx, err = testRef.EmitEventAandB(transactOpts.TransactOpts, big.NewInt(7), big.NewInt(8), big.NewInt(9)) + Nil(b.T(), err) + simulatedChain.WaitForConfirmation(b.GetTestContext(), tx) + } + + startBlocks := make([]uint64, len(contracts)) + for i, contract := range contracts { + deployTxHash := contract.DeployTx().Hash() + receipt, err := simulatedChain.TransactionReceipt(b.GetTestContext(), deployTxHash) + Nil(b.T(), err) + startBlocks[i] = receipt.BlockNumber.Uint64() + } + // Set up the ChainConfig for the backfiller. + contractConfigs := make(config.ContractConfigs) + contractConfigs["TestContractA"] = config.ContractConfig{ + Address: testContractA.Address().String(), + StartBlock: startBlocks[0], + } + contractConfigs["TestContractB"] = config.ContractConfig{ + Address: testContractB.Address().String(), + StartBlock: startBlocks[1], + } + contractConfigs["TestContractC"] = config.ContractConfig{ + Address: testContractC.Address().String(), + StartBlock: startBlocks[2], + } + chainConfig := config.ChainConfig{ + ChainID: 1, + RPCUrl: "an rpc url is not needed for simulated backends", + ConfirmationThreshold: 0, + Contracts: contractConfigs, + } + + // Set up the ChainBackfiller. + chainBackfiller, err := backfill.NewChainBackfiller(b.testDB, simulatedChain, chainConfig) + Nil(b.T(), err) + // Backfill the chain. + lastBlock, err := simulatedChain.BlockNumber(b.GetTestContext()) + Nil(b.T(), err) + err = chainBackfiller.Backfill(b.GetTestContext(), lastBlock) + Nil(b.T(), err) + + // Check that the events were written to the database. + for _, contract := range contracts { + // Check the storage of logs. + logs, err := b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), true, chainConfig.ChainID, contract.Address()) + Nil(b.T(), err) + // There should be 4 logs. One from `EmitEventA`, one from `EmitEventB`, and two + // from `EmitEventAandB`. + Equal(b.T(), 4, len(logs)) + } + // Check the storage of receipts. + receipts, err := b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), true, chainConfig.ChainID) + Nil(b.T(), err) + // There should be 9 receipts. One from `EmitEventA`, one from `EmitEventB`, and + // one from `EmitEventAandB`, for each contract. + Equal(b.T(), 9, len(receipts)) +} diff --git a/services/scribe/backfill/contract.go b/services/scribe/backfill/contract.go index def9df198b..1cce360940 100644 --- a/services/scribe/backfill/contract.go +++ b/services/scribe/backfill/contract.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "math/big" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" lru "github.com/hashicorp/golang-lru" - "github.com/synapsecns/sanguine/ethergo/contracts" + "github.com/jpillora/backoff" "github.com/synapsecns/sanguine/services/scribe/db" "github.com/synapsecns/synapse-node/pkg/evm/client" "golang.org/x/sync/errgroup" @@ -15,8 +17,12 @@ import ( // ContractBackfiller is a backfiller that fetches logs for a specific contract. type ContractBackfiller struct { - // contract is the contract to get logs for - contract contracts.DeployedContract + // mapName is the name used in the config for name->contract + mapName string + // chainID is the chainID of the chain the contract is deployed on + chainID uint32 + // address is the contract address to get logs for + address string // eventDB is the database to store event data in eventDB db.EventDB // client is the client for filtering @@ -26,7 +32,7 @@ type ContractBackfiller struct { } // NewContractBackfiller creates a new backfiller for a contract. -func NewContractBackfiller(contract contracts.DeployedContract, eventDB db.EventDB, client client.EVMClient) (*ContractBackfiller, error) { +func NewContractBackfiller(mapName string, chainID uint32, address string, eventDB db.EventDB, client client.EVMClient) (*ContractBackfiller, error) { // initialize the cache for the txHashes cache, err := lru.New(500) if err != nil { @@ -34,10 +40,12 @@ func NewContractBackfiller(contract contracts.DeployedContract, eventDB db.Event } return &ContractBackfiller{ - contract: contract, - eventDB: eventDB, - client: client, - cache: cache, + mapName: mapName, + chainID: chainID, + address: address, + eventDB: eventDB, + client: client, + cache: cache, }, nil } @@ -60,19 +68,36 @@ func (c ContractBackfiller) Backfill(ctx context.Context, givenStart uint64, end // start listening for logs g, ctx := errgroup.WithContext(ctx) g.Go(func() error { + // backoff in the case of an error + b := &backoff.Backoff{ + Factor: 2, + Jitter: true, + Min: 1 * time.Second, + Max: 30 * time.Second, + } + // timeout should always be 0 on the first attempt + timeout := time.Duration(0) for { select { case <-ctx.Done(): return nil case log := <-logChan: + // TODO: add a notification for failure to store + // wait the timeout (will be 0 on first attempt) + time.Sleep(timeout) // check if the txHash has already been stored in the cache if _, ok := c.cache.Get(log.TxHash); ok { continue } err = c.Store(ctx, log) if err != nil { - return fmt.Errorf("could not store data: %w", err) + timeout = b.Duration() + logger.Warnf("could not store data: %w", err) + continue } + // if everything works properly, restore timeout to 0 + timeout = time.Duration(0) + b.Reset() case err := <-errChan: return fmt.Errorf("could not get logs: %w", err) case <-doneChan: @@ -109,7 +134,7 @@ func (c ContractBackfiller) Store(ctx context.Context, log types.Log) error { if log == nil { return fmt.Errorf("log is nil") } - err = c.eventDB.StoreLog(groupCtx, *log, uint32(c.contract.ChainID().Uint64())) + err = c.eventDB.StoreLog(groupCtx, *log, c.chainID) if err != nil { return fmt.Errorf("could not store log: %w", err) } @@ -119,7 +144,7 @@ func (c ContractBackfiller) Store(ctx context.Context, log types.Log) error { g.Go(func() error { // store the receipt in the db - err = c.eventDB.StoreReceipt(groupCtx, *receipt, uint32(c.contract.ChainID().Uint64())) + err = c.eventDB.StoreReceipt(groupCtx, *receipt, c.chainID) if err != nil { return fmt.Errorf("could not store receipt: %w", err) } @@ -135,7 +160,7 @@ func (c ContractBackfiller) Store(ctx context.Context, log types.Log) error { if isPending { return fmt.Errorf("transaction is pending") } - err = c.eventDB.StoreEthTx(groupCtx, txn, uint32(c.contract.ChainID().Uint64())) + err = c.eventDB.StoreEthTx(groupCtx, txn, c.chainID) if err != nil { return fmt.Errorf("could not store transaction: %w", err) } @@ -148,7 +173,7 @@ func (c ContractBackfiller) Store(ctx context.Context, log types.Log) error { } // store the last indexed block in the db - err = c.eventDB.StoreLastIndexed(ctx, c.contract.Address(), uint32(c.contract.ChainID().Uint64()), receipt.BlockNumber.Uint64()) + err = c.eventDB.StoreLastIndexed(ctx, common.HexToAddress(c.address), c.chainID, receipt.BlockNumber.Uint64()) if err != nil { return fmt.Errorf("could not store last indexed block: %w", err) } @@ -170,7 +195,7 @@ func (c ContractBackfiller) GetLogs(ctx context.Context, startHeight, endHeight doneChan := make(chan bool) // start the filterer. This filters the range and sends the logs to the logChan. - rangeFilter := NewRangeFilter(c.contract.Address(), c.client, big.NewInt(int64(startHeight)), big.NewInt(int64(endHeight)), chunkSize, true) + rangeFilter := NewRangeFilter(common.HexToAddress(c.address), c.client, big.NewInt(int64(startHeight)), big.NewInt(int64(endHeight)), chunkSize, true) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { // start the range filterer, return any errors to an error channel @@ -212,7 +237,7 @@ func (c ContractBackfiller) GetLogs(ctx context.Context, startHeight, endHeight // StartHeightForBackfill gets the startHeight for backfilling. This is the maximum // of the most recent block for the contract and the startHeight given in the config. func (c ContractBackfiller) StartHeightForBackfill(ctx context.Context, givenStart uint64) (startHeight uint64, err error) { - lastBlock, err := c.eventDB.RetrieveLastIndexed(ctx, c.contract.Address(), uint32(c.contract.ChainID().Uint64())) + lastBlock, err := c.eventDB.RetrieveLastIndexed(ctx, common.HexToAddress(c.address), c.chainID) if err != nil { return 0, fmt.Errorf("could not retrieve last indexed block for contract: %w", err) } diff --git a/services/scribe/backfill/contract_test.go b/services/scribe/backfill/contract_test.go index 581173d1d1..0728b1e186 100644 --- a/services/scribe/backfill/contract_test.go +++ b/services/scribe/backfill/contract_test.go @@ -2,9 +2,12 @@ package backfill_test import ( "fmt" - "github.com/synapsecns/sanguine/services/scribe/backfill" "math/big" + "github.com/synapsecns/sanguine/services/scribe/backfill" + "github.com/synapsecns/sanguine/services/scribe/config" + + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" . "github.com/stretchr/testify/assert" @@ -16,12 +19,18 @@ import ( //nolint:cyclop func (b BackfillSuite) TestGetLogsSimulated() { // Get simulated blockchain, deploy the test contract, and set up test variables. - simulatedChain := simulated.NewSimulatedBackend(b.GetSuiteContext(), b.T()) + simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetSuiteContext(), b.T(), big.NewInt(3)) simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) testContract, testRef := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) - backfiller, err := backfill.NewContractBackfiller(testContract, b.testDB, simulatedChain) + // Set config. + contractConfig := config.ContractConfig{ + Address: testContract.Address().String(), + StartBlock: 0, + } + + backfiller, err := backfill.NewContractBackfiller("test", 3, contractConfig.Address, b.testDB, simulatedChain) Nil(b.T(), err) // Emit five events, and then fetch them with GetLogs. The first two will be fetched first, @@ -51,7 +60,7 @@ func (b BackfillSuite) TestGetLogsSimulated() { // Get the logs for the first two events. collectedLogs := []types.Log{} - logs, errors, done := backfiller.GetLogs(b.GetTestContext(), 0, txBlockNumberA) + logs, errors, done := backfiller.GetLogs(b.GetTestContext(), contractConfig.StartBlock, txBlockNumberA) for { select { case <-b.GetTestContext().Done(): @@ -88,15 +97,21 @@ Done: Equal(b.T(), 3, len(collectedLogs)) } -// TestBackfill tests using a contractBackfiller for recording receipts and logs in a database. -func (b BackfillSuite) TestBackfill() { +// TestContractBackfill tests using a contractBackfiller for recording receipts and logs in a database. +func (b BackfillSuite) TestContractBackfill() { // Get simulated blockchain, deploy the test contract, and set up test variables. - simulatedChain := simulated.NewSimulatedBackend(b.GetSuiteContext(), b.T()) + simulatedChain := simulated.NewSimulatedBackendWithChainID(b.GetSuiteContext(), b.T(), big.NewInt(142)) simulatedChain.FundAccount(b.GetTestContext(), b.wallet.Address(), *big.NewInt(params.Ether)) testContract, testRef := b.manager.GetTestContract(b.GetTestContext(), simulatedChain) transactOpts := simulatedChain.GetTxContext(b.GetTestContext(), nil) - backfiller, err := backfill.NewContractBackfiller(testContract, b.testDB, simulatedChain) + // Set config. + contractConfig := config.ContractConfig{ + Address: testContract.Address().String(), + StartBlock: 0, + } + + backfiller, err := backfill.NewContractBackfiller("test", 142, contractConfig.Address, b.testDB, simulatedChain) Nil(b.T(), err) // Emit events for the backfiller to read. @@ -115,15 +130,15 @@ func (b BackfillSuite) TestBackfill() { txBlockNumber, err := b.getTxBlockNumber(simulatedChain, tx) Nil(b.T(), err) // Backfill the events. The `0` will be replaced with the startBlock from the config. - err = backfiller.Backfill(b.GetTestContext(), 0, txBlockNumber) + err = backfiller.Backfill(b.GetTestContext(), contractConfig.StartBlock, txBlockNumber) Nil(b.T(), err) // Get all receipts. - receipts, err := b.testDB.RetrieveAllReceipts_Test(b.GetTestContext()) + receipts, err := b.testDB.UnsafeRetrieveAllReceipts(b.GetTestContext(), false, 0) Nil(b.T(), err) // Check to see if 3 receipts were collected. Equal(b.T(), 3, len(receipts)) // Get all logs. - logs, err := b.testDB.RetrieveAllLogs_Test(b.GetTestContext()) + logs, err := b.testDB.UnsafeRetrieveAllLogs(b.GetTestContext(), false, 0, common.Address{}) Nil(b.T(), err) // Check to see if 4 logs were collected. Equal(b.T(), 4, len(logs)) diff --git a/services/scribe/db/datastore/sql/base/log.go b/services/scribe/db/datastore/sql/base/log.go index e70c7033ac..97fc1238d3 100644 --- a/services/scribe/db/datastore/sql/base/log.go +++ b/services/scribe/db/datastore/sql/base/log.go @@ -5,12 +5,14 @@ import ( "database/sql" "errors" "fmt" - "github.com/synapsecns/sanguine/agents/db" "sort" + "github.com/synapsecns/sanguine/agents/db" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // StoreLog stores a log. @@ -34,21 +36,28 @@ func (s Store) StoreLog(ctx context.Context, log types.Log, chainID uint32) erro }) } } - dbTx := s.DB().WithContext(ctx).Create(&Log{ - ContractAddress: log.Address.String(), - ChainID: chainID, - PrimaryTopic: topics[0], - TopicA: topics[1], - TopicB: topics[2], - TopicC: topics[3], - Data: log.Data, - BlockNumber: log.BlockNumber, - TxHash: log.TxHash.String(), - TxIndex: uint64(log.TxIndex), - BlockHash: log.BlockHash.String(), - Index: uint64(log.Index), - Removed: log.Removed, - }) + dbTx := s.DB().WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{ + {Name: ContractAddressFieldName}, {Name: ChainIDFieldName}, {Name: TxHashFieldName}, {Name: IndexFieldName}, + }, + DoNothing: true, + }). + Create(&Log{ + ContractAddress: log.Address.String(), + ChainID: chainID, + PrimaryTopic: topics[0], + TopicA: topics[1], + TopicB: topics[2], + TopicC: topics[3], + Data: log.Data, + BlockNumber: log.BlockNumber, + TxHash: log.TxHash.String(), + TxIndex: uint64(log.TxIndex), + BlockHash: log.BlockHash.String(), + Index: uint64(log.Index), + Removed: log.Removed, + }) if dbTx.Error != nil { return fmt.Errorf("could not store log: %w", dbTx.Error) @@ -100,14 +109,24 @@ func (s Store) RetrieveLogs(ctx context.Context, txHash common.Hash, chainID uin return logs, nil } -// RetrieveAllLogs_Test retrieves all logs in the database. This is only used for testing. -// -//nolint:golint, revive, stylecheck -func (s Store) RetrieveAllLogs_Test(ctx context.Context) (logs []*types.Log, err error) { +// UnsafeRetrieveAllLogs retrieves all logs in the database. When true, `specific` lets +// you specify a chainID and contract address to specifically search for. This is only used for testing. +func (s Store) UnsafeRetrieveAllLogs(ctx context.Context, specific bool, chainID uint32, address common.Address) (logs []*types.Log, err error) { dbLogs := []Log{} - dbTx := s.DB().WithContext(ctx). - Model(&Log{}). - Find(&dbLogs) + var dbTx *gorm.DB + if specific { + dbTx = s.DB().WithContext(ctx). + Model(&Log{}). + Where(&Log{ + ChainID: chainID, + ContractAddress: address.String(), + }). + Find(&dbLogs) + } else { + dbTx = s.DB().WithContext(ctx). + Model(&Log{}). + Find(&dbLogs) + } if dbTx.Error != nil { if errors.Is(dbTx.Error, gorm.ErrRecordNotFound) { diff --git a/services/scribe/db/datastore/sql/base/model.go b/services/scribe/db/datastore/sql/base/model.go index 9752f2f8be..36f7d1e26a 100644 --- a/services/scribe/db/datastore/sql/base/model.go +++ b/services/scribe/db/datastore/sql/base/model.go @@ -16,6 +16,7 @@ func init() { ChainIDFieldName = namer.GetConsistentName("ChainID") BlockNumberFieldName = namer.GetConsistentName("BlockNumber") ContractAddressFieldName = namer.GetConsistentName("ContractAddress") + IndexFieldName = namer.GetConsistentName("Index") } var ( @@ -27,6 +28,8 @@ var ( BlockNumberFieldName string // ContractAddressFieldName is the address of the contract. ContractAddressFieldName string + // IndexFieldName is the index field name. + IndexFieldName string ) // Log stores the log of an event. @@ -34,7 +37,7 @@ type Log struct { // ContractAddress is the address of the contract that generated the event ContractAddress string `gorm:"column:contract_address;primaryKey"` // ChainID is the chain id of the contract that generated the event - ChainID uint32 `gorm:"chain_id"` + ChainID uint32 `gorm:"column:chain_id;primaryKey;auto_increment:false"` // PrimaryTopic is the primary topic of the event. Topics[0] PrimaryTopic sql.NullString `gorm:"primary_topic"` // TopicA is the first topic. Topics[1] @@ -46,7 +49,7 @@ type Log struct { // Data is the data provided by the contract Data []byte `gorm:"data"` // BlockNumber is the block in which the transaction was included - BlockNumber uint64 `gorm:"block_number"` + BlockNumber uint64 `gorm:"column:block_number"` // TxHash is the hash of the transaction TxHash string `gorm:"column:tx_hash;primaryKey"` // TxIndex is the index of the transaction in the block @@ -76,7 +79,7 @@ type Receipt struct { // TxHash is the hash of the transaction TxHash string `gorm:"column:tx_hash;primaryKey"` // ContractAddress is the address of the contract - ContractAddress string `gorm:"contract_address"` + ContractAddress string `gorm:"column:contract_address"` // GasUsed is the amount of gas used by this transaction alone GasUsed uint64 `gorm:"gas_used"` // BlockHash is the hash of the block in which this transaction was included diff --git a/services/scribe/db/datastore/sql/base/receipt.go b/services/scribe/db/datastore/sql/base/receipt.go index 441e2bc56c..c561969c2d 100644 --- a/services/scribe/db/datastore/sql/base/receipt.go +++ b/services/scribe/db/datastore/sql/base/receipt.go @@ -11,24 +11,30 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // StoreReceipt stores a receipt. func (s Store) StoreReceipt(ctx context.Context, receipt types.Receipt, chainID uint32) error { - dbTx := s.DB().WithContext(ctx).Create(&Receipt{ - ChainID: chainID, - Type: receipt.Type, - PostState: receipt.PostState, - Status: receipt.Status, - CumulativeGasUsed: receipt.CumulativeGasUsed, - Bloom: receipt.Bloom.Bytes(), - TxHash: receipt.TxHash.String(), - ContractAddress: receipt.ContractAddress.String(), - GasUsed: receipt.GasUsed, - BlockHash: receipt.BlockHash.String(), - BlockNumber: receipt.BlockNumber.Uint64(), - TransactionIndex: uint64(receipt.TransactionIndex), - }) + dbTx := s.DB().WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: TxHashFieldName}, {Name: ChainIDFieldName}}, + DoNothing: true, + }). + Create(&Receipt{ + ChainID: chainID, + Type: receipt.Type, + PostState: receipt.PostState, + Status: receipt.Status, + CumulativeGasUsed: receipt.CumulativeGasUsed, + Bloom: receipt.Bloom.Bytes(), + TxHash: receipt.TxHash.String(), + ContractAddress: receipt.ContractAddress.String(), + GasUsed: receipt.GasUsed, + BlockHash: receipt.BlockHash.String(), + BlockNumber: receipt.BlockNumber.Uint64(), + TransactionIndex: uint64(receipt.TransactionIndex), + }) if dbTx.Error != nil { return fmt.Errorf("could not store receipt: %w", dbTx.Error) @@ -76,12 +82,18 @@ func (s Store) RetrieveReceipt(ctx context.Context, txHash common.Hash, chainID return parsedReceipt, nil } -// RetrieveAllReceipts_Test retrieves all receipts. Should only be used for testing. -// -//nolint:golint, revive, stylecheck -func (s Store) RetrieveAllReceipts_Test(ctx context.Context) (receipts []types.Receipt, err error) { +// UnsafeRetrieveAllReceipts retrieves all receipts in the database. When `specific` is true, you can specify +// a chainID to specifically search for. This is only used for testing. +func (s Store) UnsafeRetrieveAllReceipts(ctx context.Context, specific bool, chainID uint32) (receipts []*types.Receipt, err error) { dbReceipts := []Receipt{} - dbTx := s.DB().WithContext(ctx).Model(&Receipt{}).Find(&dbReceipts) + var dbTx *gorm.DB + if specific { + dbTx = s.DB().WithContext(ctx).Model(&Receipt{}).Where(&Receipt{ + ChainID: chainID, + }).Find(&dbReceipts) + } else { + dbTx = s.DB().WithContext(ctx).Model(&Receipt{}).Find(&dbReceipts) + } if dbTx.Error != nil { return nil, fmt.Errorf("could not retrieve receipts: %w", dbTx.Error) @@ -94,7 +106,7 @@ func (s Store) RetrieveAllReceipts_Test(ctx context.Context) (receipts []types.R return nil, fmt.Errorf("could not retrieve logs with tx hash %s and chain id %d: %w", dbReceipt.TxHash, dbReceipt.ChainID, err) } - parsedReceipt := types.Receipt{ + parsedReceipt := &types.Receipt{ Type: dbReceipt.Type, PostState: dbReceipt.PostState, Status: dbReceipt.Status, diff --git a/services/scribe/db/datastore/sql/base/transaction.go b/services/scribe/db/datastore/sql/base/transaction.go index 99c18fd7eb..04bb3f0fc1 100644 --- a/services/scribe/db/datastore/sql/base/transaction.go +++ b/services/scribe/db/datastore/sql/base/transaction.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/core/types" + "gorm.io/gorm/clause" ) // StoreEthTx stores a processed text. @@ -14,13 +15,18 @@ func (s Store) StoreEthTx(ctx context.Context, tx *types.Transaction, chainID ui return fmt.Errorf("could not marshall tx to binary: %w", err) } - dbTx := s.DB().WithContext(ctx).Create(&EthTx{ - TxHash: tx.Hash().String(), - ChainID: chainID, - RawTx: marshalledTx, - GasFeeCap: tx.GasFeeCap().Uint64(), - GasTipCap: tx.GasTipCap().Uint64(), - }) + dbTx := s.DB().WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: TxHashFieldName}, {Name: ChainIDFieldName}}, + DoNothing: true, + }). + Create(&EthTx{ + TxHash: tx.Hash().String(), + ChainID: chainID, + RawTx: marshalledTx, + GasFeeCap: tx.GasFeeCap().Uint64(), + GasTipCap: tx.GasTipCap().Uint64(), + }) if dbTx.Error != nil { return fmt.Errorf("could not create raw tx: %w", dbTx.Error) diff --git a/services/scribe/db/event.go b/services/scribe/db/event.go index 9b71bf0132..223dd81bee 100644 --- a/services/scribe/db/event.go +++ b/services/scribe/db/event.go @@ -13,14 +13,16 @@ type EventDB interface { StoreLog(ctx context.Context, log types.Log, chainID uint32) error // RetrieveLogs retrieves logs that match a tx hash and chain id RetrieveLogs(ctx context.Context, txHash common.Hash, chainID uint32) (logs []*types.Log, err error) - // RetrieveAllLogs_Test retrieves all logs in the database. This is only used for testing. - RetrieveAllLogs_Test(ctx context.Context) (logs []*types.Log, err error) + // UnsafeRetrieveAllLogs retrieves all logs in the database. When `specific` is true, you can specify + // a chainID and contract address to specifically search for. This is only used for testing. + UnsafeRetrieveAllLogs(ctx context.Context, specific bool, chainID uint32, address common.Address) (logs []*types.Log, err error) // StoreReceipt stores a receipt StoreReceipt(ctx context.Context, receipt types.Receipt, chainID uint32) error // RetrieveReceipt retrieves a receipt by tx hash and chain id RetrieveReceipt(ctx context.Context, txHash common.Hash, chainID uint32) (receipt types.Receipt, err error) - // RetrieveAllReceipts_Test retrieves all receipts in the database. This is only used for testing. - RetrieveAllReceipts_Test(ctx context.Context) (receipts []types.Receipt, err error) + // UnsafeRetrieveAllReceipts retrieves all receipts in the database. When `specific` is true, you can specify + // a chainID to specifically search for. This is only used for testing. + UnsafeRetrieveAllReceipts(ctx context.Context, specific bool, chainID uint32) (receipts []*types.Receipt, err error) // StoreEthTx stores a processed transaction StoreEthTx(ctx context.Context, tx *types.Transaction, chainID uint32) error // StoreLastIndexed stores the last indexed for a contract address diff --git a/services/scribe/db/log_test.go b/services/scribe/db/log_test.go index 895dea6c75..03031ab436 100644 --- a/services/scribe/db/log_test.go +++ b/services/scribe/db/log_test.go @@ -1,9 +1,9 @@ package db_test import ( - "github.com/synapsecns/sanguine/services/scribe/db" "math/big" - "time" + + "github.com/synapsecns/sanguine/services/scribe/db" "github.com/brianvoe/gofakeit/v6" "github.com/ethereum/go-ethereum/common" @@ -30,7 +30,6 @@ func (t *DBSuite) TestStoreRetrieveLog() { logC := t.MakeRandomLog(txHashC) err = testDB.StoreLog(t.GetTestContext(), logC, chainID+1) Nil(t.T(), err) - time.Sleep(1 * time.Second) // Ensure the logs from the database match the ones stored. // Check the logs for the two with the same txHash. @@ -60,7 +59,7 @@ func (t *DBSuite) TestStoreRetrieveLog() { Equal(t.T(), resA, resB) // Check if `RetrieveAllLogs` returns all the logs. - allLogs, err := testDB.RetrieveAllLogs_Test(t.GetTestContext()) + allLogs, err := testDB.UnsafeRetrieveAllLogs(t.GetTestContext(), false, 0, common.Address{}) Nil(t.T(), err) Equal(t.T(), len(allLogs), 3) }) diff --git a/services/scribe/db/receipt_test.go b/services/scribe/db/receipt_test.go index 2048ea02a6..5d20e9f5fe 100644 --- a/services/scribe/db/receipt_test.go +++ b/services/scribe/db/receipt_test.go @@ -1,12 +1,13 @@ package db_test import ( + "math/big" + "github.com/brianvoe/gofakeit/v6" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" . "github.com/stretchr/testify/assert" "github.com/synapsecns/sanguine/services/scribe/db" - "math/big" ) func (t *DBSuite) TestStoreRetrieveReceipt() { @@ -46,6 +47,7 @@ func (t *DBSuite) TestStoreRetrieveReceipt() { &randomLogsA[1], }, TxHash: txHashA, + ContractAddress: common.BigToAddress(big.NewInt(gofakeit.Int64())), GasUsed: gofakeit.Uint64(), BlockNumber: big.NewInt(int64(gofakeit.Uint32())), TransactionIndex: uint(gofakeit.Uint64()), @@ -64,6 +66,7 @@ func (t *DBSuite) TestStoreRetrieveReceipt() { &randomLogsB[1], }, TxHash: txHashB, + ContractAddress: common.BigToAddress(big.NewInt(gofakeit.Int64())), GasUsed: gofakeit.Uint64(), BlockNumber: big.NewInt(int64(gofakeit.Uint32())), TransactionIndex: uint(gofakeit.Uint64()), @@ -91,7 +94,7 @@ func (t *DBSuite) TestStoreRetrieveReceipt() { Equal(t.T(), resA, resB) // Ensure RetrieveAllReceipts gets all receipts. - allReceipts, err := testDB.RetrieveAllReceipts_Test(t.GetTestContext()) + allReceipts, err := testDB.UnsafeRetrieveAllReceipts(t.GetTestContext(), false, 0) Nil(t.T(), err) Equal(t.T(), 2, len(allReceipts)) }) diff --git a/services/scribe/go.mod b/services/scribe/go.mod index d78400251d..3ca0e29395 100644 --- a/services/scribe/go.mod +++ b/services/scribe/go.mod @@ -13,7 +13,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/richardwilkes/toolbox v1.52.0 github.com/stretchr/testify v1.8.0 - github.com/synapsecns/sanguine/agents v0.0.0-20220824000943-9abcce41dac4 + github.com/synapsecns/sanguine/agents v0.0.2 github.com/synapsecns/sanguine/core v0.0.0-20220823193711-904c560fc7d3 github.com/synapsecns/sanguine/ethergo v0.0.0-00010101000000-000000000000 github.com/synapsecns/synapse-node v0.242.0 diff --git a/services/scribe/go.sum b/services/scribe/go.sum index 1d531e060c..3efe3207c3 100644 --- a/services/scribe/go.sum +++ b/services/scribe/go.sum @@ -1,4 +1,5 @@ bazil.org/fuse v0.0.0-20200407214033-5883e5a4b512/go.mod h1:FbcW6z/2VytnFDhZfumh8Ss8zxHE6qpMP5sHTRe0EaM= +bitbucket.org/tentontrain/math v0.0.0-20220519191623-a4e86beba92a h1:6QCkYok6wNGonv0ya01Ay5uV8zT412p4wm2stFZsUQM= bou.ke/monkey v1.0.1/go.mod h1:FgHuK96Rv2Nlf+0u1OOVDpCMdsWyOFmeeketDHE7LIg= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -1715,8 +1716,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= -github.com/synapsecns/sanguine/agents v0.0.0-20220824000943-9abcce41dac4 h1:9V1cPBSqJgo5QoZ5Gcx32TmVFEXz1JPAugKQqEIJJ+w= -github.com/synapsecns/sanguine/agents v0.0.0-20220824000943-9abcce41dac4/go.mod h1:ZIkfPy/nmw5sPvGIX1/D3FF5bDPPha5xjiY9r0FOzBE= +github.com/synapsecns/sanguine/agents v0.0.2 h1:/LhQNaYfW5RopjyfepAC+0EMDrkdi+n3JDyU/H+9C0A= +github.com/synapsecns/sanguine/agents v0.0.2/go.mod h1:24YFRxTwdBrHOUV0PWNYfTku1xyCUrF+ikPjKLQfTqw= github.com/synapsecns/synapse-node v0.242.1-0.20220523175312-65a2f2613b1f h1:F2+tY8Y1n1Qn77mROlRcqV+k987FJNTwL4k48s4pck0= github.com/synapsecns/synapse-node v0.242.1-0.20220523175312-65a2f2613b1f/go.mod h1:5YlKLpGdk5NpjbEguajL2BA8xFvY4EqL1asSpUm8+4o= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= @@ -1939,6 +1940,7 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20220823124025-807a23277127 h1:S4NrSKDfihhl3+4jSTgwoIevKxX9p7Iv9x++OEIptDo= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=