diff --git a/cmd/serve.go b/cmd/serve.go index 45aff2898..ec662cfcb 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -25,15 +25,14 @@ import ( "sync" "time" - "github.com/mailgun/groupcache/v2" "github.com/cerc-io/ipld-eth-server/v4/pkg/log" "github.com/ethereum/go-ethereum/rpc" + "github.com/mailgun/groupcache/v2" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/vulcanize/gap-filler/pkg/mux" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" "github.com/cerc-io/ipld-eth-server/v4/pkg/graphql" srpc "github.com/cerc-io/ipld-eth-server/v4/pkg/rpc" s "github.com/cerc-io/ipld-eth-server/v4/pkg/serve" @@ -59,7 +58,6 @@ var serveCmd = &cobra.Command{ func serve() { logWithCommand.Infof("running ipld-eth-server version: %s", v.VersionWithMeta) - var forwardPayloadChan chan eth.ConvertedPayload wg := new(sync.WaitGroup) logWithCommand.Debug("loading server configuration variables") serverConfig, err := s.NewConfig() @@ -74,8 +72,7 @@ func serve() { } logWithCommand.Info("starting up server servers") - forwardPayloadChan = make(chan eth.ConvertedPayload, s.PayloadChanBufferSize) - server.Serve(wg, forwardPayloadChan) + server.Serve(wg) if err := startServers(server, serverConfig); err != nil { logWithCommand.Fatal(err) } diff --git a/cmd/subscribe.go b/cmd/subscribe.go deleted file mode 100644 index 155aebbcd..000000000 --- a/cmd/subscribe.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "bytes" - "fmt" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/client" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - w "github.com/cerc-io/ipld-eth-server/v4/pkg/serve" -) - -// subscribeCmd represents the subscribe command -var subscribeCmd = &cobra.Command{ - Use: "subscribe", - Short: "This command is used to subscribe to the eth ipfs watcher data stream with the provided filters", - Long: `This command is for demo and testing purposes and is used to subscribe to the watcher with the provided subscription configuration parameters. -It does not do anything with the data streamed from the watcher other than unpack it and print it out for demonstration purposes.`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - subscribe() - }, -} - -func init() { - rootCmd.AddCommand(subscribeCmd) -} - -func subscribe() { - // Prep the subscription config/filters to be sent to the server - ethSubConfig, err := eth.NewEthSubscriptionConfig() - if err != nil { - log.Fatal(err) - } - - // Create a new rpc client and a subscription streamer with that client - rpcClient, err := getRPCClient() - if err != nil { - logWithCommand.Fatal(err) - } - subClient := client.NewClient(rpcClient) - - // Buffered channel for reading subscription payloads - payloadChan := make(chan w.SubscriptionPayload, 20000) - - // Subscribe to the watcher service with the given config/filter parameters - sub, err := subClient.Stream(payloadChan, *ethSubConfig) - if err != nil { - logWithCommand.Fatal(err) - } - logWithCommand.Info("awaiting payloads") - // Receive response payloads and print out the results - for { - select { - case payload := <-payloadChan: - if payload.Err != "" { - logWithCommand.Error(payload.Err) - continue - } - var ethData eth.IPLDs - if err := rlp.DecodeBytes(payload.Data, ðData); err != nil { - logWithCommand.Error(err) - continue - } - var header types.Header - err = rlp.Decode(bytes.NewBuffer(ethData.Header.Data), &header) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex()) - fmt.Printf("header: %v\n", header) - for _, trxRlp := range ethData.Transactions { - var trx types.Transaction - buff := bytes.NewBuffer(trxRlp.Data) - stream := rlp.NewStream(buff, 0) - err := trx.DecodeRLP(stream) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Transaction with hash %s\n", trx.Hash().Hex()) - fmt.Printf("trx: %v\n", trx) - } - for _, rctRlp := range ethData.Receipts { - var rct types.Receipt - buff := bytes.NewBuffer(rctRlp.Data) - stream := rlp.NewStream(buff, 0) - err = rct.DecodeRLP(stream) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Receipt with block hash %s, trx hash %s\n", rct.BlockHash.Hex(), rct.TxHash.Hex()) - fmt.Printf("rct: %v\n", rct) - for _, l := range rct.Logs { - if len(l.Topics) < 1 { - logWithCommand.Error(fmt.Sprintf("log only has %d topics", len(l.Topics))) - continue - } - fmt.Printf("Log for block hash %s, trx hash %s, address %s, and with topic0 %s\n", - l.BlockHash.Hex(), l.TxHash.Hex(), l.Address.Hex(), l.Topics[0].Hex()) - fmt.Printf("log: %v\n", l) - } - } - // This assumes leafs only - for _, stateNode := range ethData.StateNodes { - var acct types.StateAccount - err = rlp.DecodeBytes(stateNode.IPLD.Data, &acct) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Account for key %s, and root %s, with balance %s\n", - stateNode.StateLeafKey.Hex(), acct.Root.Hex(), acct.Balance.String()) - fmt.Printf("state account: %+v\n", acct) - } - for _, storageNode := range ethData.StorageNodes { - fmt.Printf("Storage for state key %s ", storageNode.StateLeafKey.Hex()) - fmt.Printf("with storage key %s\n", storageNode.StorageLeafKey.Hex()) - var i []interface{} - err := rlp.DecodeBytes(storageNode.IPLD.Data, &i) - if err != nil { - logWithCommand.Error(err) - continue - } - // if a value node - if len(i) == 1 { - valueBytes, ok := i[0].([]byte) - if !ok { - continue - } - fmt.Printf("Storage leaf key: %s, and value hash: %s\n", - storageNode.StorageLeafKey.Hex(), common.BytesToHash(valueBytes).Hex()) - } - } - case err = <-sub.Err(): - logWithCommand.Fatal(err) - } - } -} - -func getRPCClient() (*rpc.Client, error) { - vulcPath := viper.GetString("watcher.ethSubscription.wsPath") - if vulcPath == "" { - vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided - } - return rpc.Dial(vulcPath) -} diff --git a/pkg/client/client.go b/pkg/client/client.go deleted file mode 100644 index 524f0c360..000000000 --- a/pkg/client/client.go +++ /dev/null @@ -1,44 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipld-eth-server -package client - -import ( - "context" - - "github.com/ethereum/go-ethereum/rpc" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/serve" -) - -// Client is used to subscribe to the ipld-eth-server ipld data stream -type Client struct { - c *rpc.Client -} - -// NewClient creates a new Client -func NewClient(c *rpc.Client) *Client { - return &Client{ - c: c, - } -} - -// Stream is the main loop for subscribing to iplds from an ipld-eth-server server -func (c *Client) Stream(payloadChan chan serve.SubscriptionPayload, params eth.SubscriptionSettings) (*rpc.ClientSubscription, error) { - return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", params) -} diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 03c9f2cca..6d92f994b 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -110,7 +110,6 @@ type Backend struct { // postgres db interfaces Retriever *CIDRetriever - Fetcher *IPLDFetcher IPLDRetriever *IPLDRetriever // ethereum interfaces @@ -148,7 +147,6 @@ func NewEthBackend(db *sqlx.DB, c *Config) (*Backend, error) { return &Backend{ DB: db, Retriever: r, - Fetcher: NewIPLDFetcher(db), IPLDRetriever: NewIPLDRetriever(db), EthDB: ethDB, StateDatabase: state.NewDatabase(ethDB), diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index 591986975..b9b943b4e 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -19,27 +19,16 @@ package eth import ( "fmt" "math/big" - "strconv" "github.com/cerc-io/ipld-eth-server/v4/pkg/log" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/jmoiron/sqlx" "github.com/lib/pq" "gorm.io/driver/postgres" "gorm.io/gorm" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" ) -// Retriever interface for substituting mocks in tests -type Retriever interface { - RetrieveFirstBlockNumber() (int64, error) - RetrieveLastBlockNumber() (int64, error) - Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDWrapper, bool, error) -} - // CIDRetriever satisfies the CIDRetriever interface for ethereum type CIDRetriever struct { db *sqlx.DB @@ -128,158 +117,6 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { return blockNumber, err } -// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDWrapper, bool, error) { - log.Debug("retrieving cids") - - // Begin new db tx - tx, err := ecr.db.Beginx() - if err != nil { - return nil, true, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - // Retrieve cached header CIDs at this block height - var headers []models.HeaderModel - headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) - if err != nil { - log.Error("header cid retrieval error", err) - return nil, true, err - } - cws := make([]CIDWrapper, len(headers)) - empty := true - for i, header := range headers { - cw := new(CIDWrapper) - cw.BlockNumber = big.NewInt(blockNumber) - if !filter.HeaderFilter.Off { - cw.Header = header - empty = false - if filter.HeaderFilter.Uncles { - // Retrieve uncle cids for this header id - var uncleCIDs []models.UncleModel - uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, header.BlockHash) - if err != nil { - log.Error("uncle cid retrieval error") - return nil, true, err - } - cw.Uncles = uncleCIDs - } - } - // Retrieve cached trx CIDs - if !filter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, filter.TxFilter, header.BlockHash) - if err != nil { - log.Error("transaction cid retrieval error") - return nil, true, err - } - if len(cw.Transactions) > 0 { - empty = false - } - } - trxHashes := make([]string, len(cw.Transactions)) - for j, t := range cw.Transactions { - trxHashes[j] = t.TxHash - } - // Retrieve cached receipt CIDs - if !filter.ReceiptFilter.Off { - cw.Receipts, err = ecr.RetrieveRctCIDs(tx, filter.ReceiptFilter, 0, header.BlockHash, trxHashes) - if err != nil { - log.Error("receipt cid retrieval error") - return nil, true, err - } - if len(cw.Receipts) > 0 { - empty = false - } - } - // Retrieve cached state CIDs - if !filter.StateFilter.Off { - cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, filter.StateFilter, header.BlockHash) - if err != nil { - log.Error("state cid retrieval error") - return nil, true, err - } - if len(cw.StateNodes) > 0 { - empty = false - } - } - // Retrieve cached storage CIDs - if !filter.StorageFilter.Off { - cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, filter.StorageFilter, header.BlockHash) - if err != nil { - log.Error("storage cid retrieval error") - return nil, true, err - } - if len(cw.StorageNodes) > 0 { - empty = false - } - } - cws[i] = *cw - } - - return cws, empty, err -} - -// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight -func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]models.HeaderModel, error) { - log.Debug("retrieving header cids for block ", blockNumber) - headers := make([]models.HeaderModel, 0) - pgStr := `SELECT CAST(block_number as Text), block_hash, parent_hash, cid, mh_key, CAST(td as Text), node_id, - CAST(reward as Text), state_root, uncle_root,tx_root, receipt_root, bloom, timestamp, times_validated, coinbase - FROM eth.header_cids - WHERE block_number = $1` - return headers, tx.Select(&headers, pgStr, blockNumber) -} - -// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header -func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID string) ([]models.UncleModel, error) { - log.Debug("retrieving uncle cids for block id ", headerID) - headers := make([]models.UncleModel, 0) - pgStr := `SELECT CAST(block_number as Text), header_id, block_hash, parent_hash, cid, mh_key, CAST(reward as text) - FROM eth.uncle_cids - WHERE header_id = $1` - return headers, tx.Select(&headers, pgStr, headerID) -} - -// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters -// also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID string) ([]models.TxModel, error) { - log.Debug("retrieving transaction cids for header id ", headerID) - args := make([]interface{}, 0, 3) - results := make([]models.TxModel, 0) - id := 1 - pgStr := fmt.Sprintf(`SELECT CAST(transaction_cids.block_number as Text), transaction_cids.tx_hash, - transaction_cids.header_id, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.dst, - transaction_cids.src, transaction_cids.index, transaction_cids.tx_data, transaction_cids.tx_type - FROM eth.transaction_cids - INNER JOIN eth.header_cids ON ( - transaction_cids.header_id = header_cids.block_hash - AND transaction_cids.block_number = header_cids.block_number - ) - WHERE header_cids.block_hash = $%d`, id) - args = append(args, headerID) - id++ - if len(txFilter.Dst) > 0 { - pgStr += fmt.Sprintf(` AND transaction_cids.dst = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(txFilter.Dst)) - id++ - } - if len(txFilter.Src) > 0 { - pgStr += fmt.Sprintf(` AND transaction_cids.src = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(txFilter.Src)) - } - pgStr += ` ORDER BY transaction_cids.index` - return results, tx.Select(&results, pgStr, args...) -} - func topicFilterCondition(id *int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}) { for i, topicSet := range topics { if len(topicSet) == 0 { @@ -439,38 +276,6 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilte return logCIDs, nil } -// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided -// filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash string, txHashes []string) ([]models.ReceiptModel, error) { - log.Debug("retrieving receipt cids for block ", blockNumber) - args := make([]interface{}, 0, 5) - pgStr := `SELECT CAST(receipt_cids.block_number as Text), receipt_cids.header_id, receipt_cids.tx_id, - receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash - FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.tx_hash - AND receipt_cids.header_id = transaction_cids.header_id - AND receipt_cids.block_number = transaction_cids.block_number - AND transaction_cids.header_id = header_cids.block_hash - AND transaction_cids.block_number = header_cids.block_number` - id := 1 - if blockNumber > 0 { - pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id) - args = append(args, blockNumber) - id++ - } - if blockHash != "" { - pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id) - args = append(args, blockHash) - id++ - } - - pgStr, args = receiptFilterConditions(&id, pgStr, args, rctFilter, txHashes) - - pgStr += ` ORDER BY transaction_cids.index` - receiptCIDs := make([]models.ReceiptModel, 0) - return receiptCIDs, tx.Select(&receiptCIDs, pgStr, args...) -} - func hasTopics(topics [][]string) bool { for _, topicSet := range topics { if len(topicSet) > 0 { @@ -480,179 +285,6 @@ func hasTopics(topics [][]string) bool { return false } -// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID string) ([]models.StateNodeModel, error) { - log.Debug("retrieving state cids for header id ", headerID) - args := make([]interface{}, 0, 2) - pgStr := `SELECT CAST(state_cids.block_number as Text), state_cids.header_id, - state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path - FROM eth.state_cids - INNER JOIN eth.header_cids ON ( - state_cids.header_id = header_cids.block_hash - AND state_cids.block_number = header_cids.block_number - ) - WHERE header_cids.block_hash = $1` - args = append(args, headerID) - addrLen := len(stateFilter.Addresses) - if addrLen > 0 { - keys := make([]string, addrLen) - for i, addr := range stateFilter.Addresses { - keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String() - } - pgStr += ` AND state_cids.state_leaf_key = ANY($2::VARCHAR(66)[])` - args = append(args, pq.Array(keys)) - } - if !stateFilter.IntermediateNodes { - pgStr += ` AND state_cids.node_type = 2` - } - stateNodeCIDs := make([]models.StateNodeModel, 0) - return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) -} - -// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID string) ([]models.StorageNodeWithStateKeyModel, error) { - log.Debug("retrieving storage cids for header id ", headerID) - args := make([]interface{}, 0, 3) - pgStr := `SELECT CAST(storage_cids.block_number as Text), storage_cids.header_id, storage_cids.storage_leaf_key, - storage_cids.node_type, storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, storage_cids.state_path, - state_cids.state_leaf_key - FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.header_id = state_cids.header_id - AND storage_cids.state_path = state_cids.state_path - AND storage_cids.block_number = state_cids.block_number - AND state_cids.header_id = header_cids.block_hash - AND state_cids.block_number = header_cids.block_number - AND header_cids.block_hash = $1` - args = append(args, headerID) - id := 2 - addrLen := len(storageFilter.Addresses) - if addrLen > 0 { - keys := make([]string, addrLen) - for i, addr := range storageFilter.Addresses { - keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String() - } - pgStr += fmt.Sprintf(` AND state_cids.state_leaf_key = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(keys)) - id++ - } - if len(storageFilter.StorageKeys) > 0 { - pgStr += fmt.Sprintf(` AND storage_cids.storage_leaf_key = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(storageFilter.StorageKeys)) - } - if !storageFilter.IntermediateNodes { - pgStr += ` AND storage_cids.node_type = 2` - } - storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0) - return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) -} - -// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash -func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) { - log.Debug("retrieving block cids for block hash ", blockHash.String()) - - // Begin new db tx - tx, err := ecr.db.Beginx() - if err != nil { - return models.HeaderModel{}, nil, nil, nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - var headerCID models.HeaderModel - headerCID, err = ecr.RetrieveHeaderCIDByHash(tx, blockHash) - if err != nil { - log.Error("header cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - blockNumber, err := strconv.ParseInt(headerCID.BlockNumber, 10, 64) - if err != nil { - return models.HeaderModel{}, nil, nil, nil, err - } - var uncleCIDs []models.UncleModel - uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.BlockHash) - if err != nil { - log.Error("uncle cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - var txCIDs []models.TxModel - txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash, blockNumber) - if err != nil { - log.Error("tx cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - txHashes := make([]string, len(txCIDs)) - for i, txCID := range txCIDs { - txHashes[i] = txCID.TxHash - } - var rctCIDs []models.ReceiptModel - rctCIDs, err = ecr.RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx, headerCID.BlockHash, txHashes, blockNumber) - if err != nil { - log.Error("rct cid retrieval error") - } - return headerCID, uncleCIDs, txCIDs, rctCIDs, err -} - -// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number -func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) { - log.Debug("retrieving block cids for block number ", blockNumber) - - // Begin new db tx - tx, err := ecr.db.Beginx() - if err != nil { - return models.HeaderModel{}, nil, nil, nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - var headerCID []models.HeaderModel - headerCID, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) - if err != nil { - log.Error("header cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - if len(headerCID) < 1 { - return models.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) - } - var uncleCIDs []models.UncleModel - uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].BlockHash) - if err != nil { - log.Error("uncle cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - var txCIDs []models.TxModel - txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].BlockHash, blockNumber) - if err != nil { - log.Error("tx cid retrieval error") - return models.HeaderModel{}, nil, nil, nil, err - } - txHashes := make([]string, len(txCIDs)) - for i, txCID := range txCIDs { - txHashes[i] = txCID.TxHash - } - var rctCIDs []models.ReceiptModel - rctCIDs, err = ecr.RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx, headerCID[0].BlockHash, txHashes, blockNumber) - if err != nil { - log.Error("rct cid retrieval error") - } - return headerCID[0], uncleCIDs, txCIDs, rctCIDs, err -} - // RetrieveHeaderCIDByHash returns the header for the given block hash func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (models.HeaderModel, error) { log.Debug("retrieving header cids for block hash ", blockHash.String()) @@ -663,35 +295,6 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) } -// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id -func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID string, blockNumber int64) ([]models.TxModel, error) { - log.Debug("retrieving tx cids for block id ", headerID) - pgStr := `SELECT CAST(block_number as Text), header_id, index, tx_hash, cid, mh_key, - dst, src, tx_data, tx_type, value - FROM eth.transaction_cids - WHERE header_id = $1 AND block_number = $2 - ORDER BY index` - var txCIDs []models.TxModel - return txCIDs, tx.Select(&txCIDs, pgStr, headerID, blockNumber) -} - -// RetrieveReceiptCIDsByByHeaderIDAndTxIDs retrieves receipt CIDs by their associated tx IDs for the given header id -func (ecr *CIDRetriever) RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx *sqlx.Tx, headerID string, txHashes []string, blockNumber int64) ([]models.ReceiptModel, error) { - log.Debugf("retrieving receipt cids for tx hashes %v", txHashes) - pgStr := `SELECT CAST(receipt_cids.block_number as Text), receipt_cids.header_id, receipt_cids.tx_id, receipt_cids.leaf_cid, - receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash - FROM eth.receipt_cids, eth.transaction_cids - WHERE tx_id = ANY($2) - AND receipt_cids.tx_id = transaction_cids.tx_hash - AND receipt_cids.header_id = transaction_cids.header_id - AND receipt_cids.block_number = transaction_cids.block_number - AND transaction_cids.header_id = $1 - AND transaction_cids.block_number = $3 - ORDER BY transaction_cids.index` - var rctCIDs []models.ReceiptModel - return rctCIDs, tx.Select(&rctCIDs, pgStr, headerID, pq.Array(txHashes), blockNumber) -} - // RetrieveHeaderAndTxCIDsByBlockNumber retrieves header CIDs and their associated tx CIDs by block number func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockNumber(blockNumber int64) ([]HeaderCIDRecord, error) { log.Debug("retrieving header cids and tx cids for block number ", blockNumber) diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index 5ab418271..2941da0dc 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -17,197 +17,18 @@ package eth_test import ( - "math/big" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" "github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers" "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" - "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/trie" "github.com/jmoiron/sqlx" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var ( - openFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{}, - TxFilter: eth.TxFilter{}, - ReceiptFilter: eth.ReceiptFilter{}, - StateFilter: eth.StateFilter{}, - StorageFilter: eth.StorageFilter{}, - } - rctAddressFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - LogAddresses: []string{test_helpers.Address.String()}, - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctTopicsFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000004"}}, - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctTopicsAndAddressFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - Topics: [][]string{ - {"0x0000000000000000000000000000000000000000000000000000000000000004"}, - {"0x0000000000000000000000000000000000000000000000000000000000000006"}, - }, - LogAddresses: []string{test_helpers.Address.String()}, - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctTopicsAndAddressFilterFail = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - Topics: [][]string{ - {"0x0000000000000000000000000000000000000000000000000000000000000004"}, - {"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt - }, - LogAddresses: []string{test_helpers.Address.String()}, - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctAddressesAndTopicFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}}, - LogAddresses: []string{test_helpers.Address.String(), test_helpers.AnotherAddress.String()}, - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctsForAllCollectedTrxs = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter - ReceiptFilter: eth.ReceiptFilter{ - MatchTxs: true, - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have - LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - rctsForSelectCollectedTrxs = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Dst: []string{test_helpers.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt - }, - ReceiptFilter: eth.ReceiptFilter{ - MatchTxs: true, - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have - LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have - }, - StateFilter: eth.StateFilter{ - Off: true, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } - stateFilter = eth.SubscriptionSettings{ - Start: big.NewInt(0), - End: big.NewInt(1), - HeaderFilter: eth.HeaderFilter{ - Off: true, - }, - TxFilter: eth.TxFilter{ - Off: true, - }, - ReceiptFilter: eth.ReceiptFilter{ - Off: true, - }, - StateFilter: eth.StateFilter{ - Addresses: []string{test_helpers.AccountAddresss.Hex()}, - }, - StorageFilter: eth.StorageFilter{ - Off: true, - }, - } -) - var _ = Describe("Retriever", func() { var ( db *sqlx.DB @@ -236,196 +57,6 @@ var _ = Describe("Retriever", func() { err = tx.Submit(err) Expect(err).ToNot(HaveOccurred()) }) - It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { - type rctCIDAndMHKeyResult struct { - LeafCID string `db:"leaf_cid"` - LeafMhKey string `db:"leaf_mh_key"` - } - expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0) - pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.tx_hash - AND transaction_cids.header_id = header_cids.block_hash - AND header_cids.block_number = $1 - ORDER BY transaction_cids.index` - err := db.Select(&expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64()) - Expect(err).ToNot(HaveOccurred()) - cids, empty, err := retriever.Retrieve(openFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids)).To(Equal(1)) - Expect(cids[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - - expectedHeaderCID := test_helpers.MockCIDWrapper.Header - expectedHeaderCID.BlockHash = cids[0].Header.BlockHash - expectedHeaderCID.NodeID = cids[0].Header.NodeID - Expect(cids[0].Header).To(Equal(expectedHeaderCID)) - Expect(len(cids[0].Transactions)).To(Equal(4)) - Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[2].CID)).To(BeTrue()) - Expect(len(cids[0].Receipts)).To(Equal(4)) - Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[0].LeafCID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[1].LeafCID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[2].LeafCID)).To(BeTrue()) - Expect(len(cids[0].StateNodes)).To(Equal(2)) - - for _, stateNode := range cids[0].StateNodes { - if stateNode.CID == test_helpers.State1CID.String() { - Expect(stateNode.StateKey).To(Equal(common.BytesToHash(test_helpers.ContractLeafKey).Hex())) - Expect(stateNode.NodeType).To(Equal(2)) - Expect(stateNode.Path).To(Equal([]byte{'\x06'})) - } - if stateNode.CID == test_helpers.State2CID.String() { - Expect(stateNode.StateKey).To(Equal(common.BytesToHash(test_helpers.AccountLeafKey).Hex())) - Expect(stateNode.NodeType).To(Equal(2)) - Expect(stateNode.Path).To(Equal([]byte{'\x0c'})) - } - } - Expect(len(cids[0].StorageNodes)).To(Equal(1)) - expectedStorageNodeCIDs := test_helpers.MockCIDWrapper.StorageNodes - expectedStorageNodeCIDs[0].HeaderID = cids[0].StorageNodes[0].HeaderID - expectedStorageNodeCIDs[0].StatePath = cids[0].StorageNodes[0].StatePath - Expect(cids[0].StorageNodes).To(Equal(expectedStorageNodeCIDs)) - }) - - It("Applies filters from the provided config.Subscription", func() { - type rctCIDAndMHKeyResult struct { - LeafCID string `db:"leaf_cid"` - LeafMhKey string `db:"leaf_mh_key"` - } - expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0) - pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.tx_hash - AND transaction_cids.header_id = header_cids.block_hash - AND header_cids.block_number = $1 - ORDER BY transaction_cids.index` - err := db.Select(&expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64()) - Expect(err).ToNot(HaveOccurred()) - cids1, empty, err := retriever.Retrieve(rctAddressFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids1)).To(Equal(1)) - Expect(cids1[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids1[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids1[0].Transactions)).To(Equal(0)) - Expect(len(cids1[0].StateNodes)).To(Equal(0)) - Expect(len(cids1[0].StorageNodes)).To(Equal(0)) - Expect(len(cids1[0].Receipts)).To(Equal(1)) - expectedReceiptCID := test_helpers.MockCIDWrapper.Receipts[0] - expectedReceiptCID.TxID = cids1[0].Receipts[0].TxID - expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID - expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey - Expect(cids1[0].Receipts[0]).To(Equal(expectedReceiptCID)) - - cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids2)).To(Equal(1)) - Expect(cids2[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids2[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids2[0].Transactions)).To(Equal(0)) - Expect(len(cids2[0].StateNodes)).To(Equal(0)) - Expect(len(cids2[0].StorageNodes)).To(Equal(0)) - Expect(len(cids2[0].Receipts)).To(Equal(1)) - expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[0] - expectedReceiptCID.TxID = cids2[0].Receipts[0].TxID - expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID - expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey - Expect(cids2[0].Receipts[0]).To(Equal(expectedReceiptCID)) - - cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids3)).To(Equal(1)) - Expect(cids3[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids3[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids3[0].Transactions)).To(Equal(0)) - Expect(len(cids3[0].StateNodes)).To(Equal(0)) - Expect(len(cids3[0].StorageNodes)).To(Equal(0)) - Expect(len(cids3[0].Receipts)).To(Equal(1)) - expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[0] - expectedReceiptCID.TxID = cids3[0].Receipts[0].TxID - expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID - expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey - Expect(cids3[0].Receipts[0]).To(Equal(expectedReceiptCID)) - - cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids4)).To(Equal(1)) - Expect(cids4[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids4[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids4[0].Transactions)).To(Equal(0)) - Expect(len(cids4[0].StateNodes)).To(Equal(0)) - Expect(len(cids4[0].StorageNodes)).To(Equal(0)) - Expect(len(cids4[0].Receipts)).To(Equal(1)) - expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[1] - expectedReceiptCID.TxID = cids4[0].Receipts[0].TxID - expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[1].LeafCID - expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[1].LeafMhKey - Expect(cids4[0].Receipts[0]).To(Equal(expectedReceiptCID)) - - cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids5)).To(Equal(1)) - Expect(cids5[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids5[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids5[0].Transactions)).To(Equal(4)) - Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx1CID.String())).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx2CID.String())).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx3CID.String())).To(BeTrue()) - Expect(len(cids5[0].StateNodes)).To(Equal(0)) - Expect(len(cids5[0].StorageNodes)).To(Equal(0)) - Expect(len(cids5[0].Receipts)).To(Equal(4)) - Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[0].LeafCID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[1].LeafCID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[2].LeafCID)).To(BeTrue()) - - cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids6)).To(Equal(1)) - Expect(cids6[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids6[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids6[0].Transactions)).To(Equal(1)) - expectedTxCID := test_helpers.MockCIDWrapper.Transactions[1] - expectedTxCID.TxHash = cids6[0].Transactions[0].TxHash - expectedTxCID.HeaderID = cids6[0].Transactions[0].HeaderID - Expect(cids6[0].Transactions[0]).To(Equal(expectedTxCID)) - Expect(len(cids6[0].StateNodes)).To(Equal(0)) - Expect(len(cids6[0].StorageNodes)).To(Equal(0)) - Expect(len(cids6[0].Receipts)).To(Equal(1)) - expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[1] - expectedReceiptCID.TxID = cids6[0].Receipts[0].TxID - expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[1].LeafCID - expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[1].LeafMhKey - Expect(cids6[0].Receipts[0]).To(Equal(expectedReceiptCID)) - - cids7, empty, err := retriever.Retrieve(stateFilter, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).ToNot(BeTrue()) - Expect(len(cids7)).To(Equal(1)) - Expect(cids7[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids7[0].Header).To(Equal(models.HeaderModel{})) - Expect(len(cids7[0].Transactions)).To(Equal(0)) - Expect(len(cids7[0].Receipts)).To(Equal(0)) - Expect(len(cids7[0].StorageNodes)).To(Equal(0)) - Expect(len(cids7[0].StateNodes)).To(Equal(1)) - Expect(cids7[0].StateNodes[0]).To(Equal(models.StateNodeModel{ - BlockNumber: "1", - HeaderID: cids7[0].StateNodes[0].HeaderID, - NodeType: 2, - StateKey: common.BytesToHash(test_helpers.AccountLeafKey).Hex(), - CID: test_helpers.State2CID.String(), - MhKey: test_helpers.State2MhKey, - Path: []byte{'\x0c'}, - })) - - _, empty, err = retriever.Retrieve(rctTopicsAndAddressFilterFail, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(empty).To(BeTrue()) - }) }) Describe("RetrieveFirstBlockNumber", func() { diff --git a/pkg/eth/filterer.go b/pkg/eth/filterer.go deleted file mode 100644 index d4cc936a8..000000000 --- a/pkg/eth/filterer.go +++ /dev/null @@ -1,366 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - sdtypes "github.com/ethereum/go-ethereum/statediff/types" - "github.com/ipfs/go-cid" - "github.com/multiformats/go-multihash" -) - -// Filterer interface for substituing mocks in tests -type Filterer interface { - Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) -} - -// ResponseFilterer satisfies the ResponseFilterer interface for ethereum -type ResponseFilterer struct{} - -// NewResponseFilterer creates a new Filterer satisfying the ResponseFilterer interface -func NewResponseFilterer() *ResponseFilterer { - return &ResponseFilterer{} -} - -// Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) { - if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) { - response := new(IPLDs) - response.TotalDifficulty = payload.TotalDifficulty - if err := s.filterHeaders(filter.HeaderFilter, response, payload); err != nil { - return nil, err - } - txHashes, err := s.filterTransactions(filter.TxFilter, response, payload) - if err != nil { - return nil, err - } - var filterTxs []common.Hash - if filter.ReceiptFilter.MatchTxs { - filterTxs = txHashes - } - if err := s.filerReceipts(filter.ReceiptFilter, response, payload, filterTxs); err != nil { - return nil, err - } - if err := s.filterStateAndStorage(filter.StateFilter, filter.StorageFilter, response, payload); err != nil { - return nil, err - } - response.BlockNumber = payload.Block.Number() - return response, nil - } - return nil, nil -} - -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { - if !headerFilter.Off { - headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) - if err != nil { - return err - } - cid, err := ipld.RawdataToCid(ipld.MEthHeader, headerRLP, multihash.KECCAK_256) - if err != nil { - return err - } - response.Header = models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: headerRLP, - Key: cid.String(), - } - if headerFilter.Uncles { - response.Uncles = make([]models.IPLDModel, len(payload.Block.Body().Uncles)) - for i, uncle := range payload.Block.Body().Uncles { - uncleRlp, err := rlp.EncodeToBytes(uncle) - if err != nil { - return err - } - cid, err := ipld.RawdataToCid(ipld.MEthHeader, uncleRlp, multihash.KECCAK_256) - if err != nil { - return err - } - response.Uncles[i] = models.IPLDModel{ - BlockNumber: uncle.Number.String(), - Data: uncleRlp, - Key: cid.String(), - } - } - } - } - return nil -} - -func checkRange(start, end, actual int64) bool { - if (end <= 0 || end >= actual) && start <= actual { - return true - } - return false -} - -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { - var trxHashes []common.Hash - if !trxFilter.Off { - trxLen := len(payload.Block.Body().Transactions) - trxHashes = make([]common.Hash, 0, trxLen) - response.Transactions = make([]models.IPLDModel, 0, trxLen) - for i, trx := range payload.Block.Body().Transactions { - // TODO: check if want corresponding receipt and if we do we must include this transaction - if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { - trxBuffer := new(bytes.Buffer) - if err := trx.EncodeRLP(trxBuffer); err != nil { - return nil, err - } - data := trxBuffer.Bytes() - cid, err := ipld.RawdataToCid(ipld.MEthTx, data, multihash.KECCAK_256) - if err != nil { - return nil, err - } - response.Transactions = append(response.Transactions, models.IPLDModel{ - Data: data, - Key: cid.String(), - }) - trxHashes = append(trxHashes, trx.Hash()) - } - } - } - return trxHashes, nil -} - -// checkTransactionAddrs returns true if either the transaction src and dst are one of the wanted src and dst addresses -func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool { - // If we aren't filtering for any addresses, every transaction is a go - if len(wantedDst) == 0 && len(wantedSrc) == 0 { - return true - } - for _, src := range wantedSrc { - if src == actualSrc { - return true - } - } - for _, dst := range wantedDst { - if dst == actualDst { - return true - } - } - return false -} - -func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { - if !receiptFilter.Off { - response.Receipts = make([]models.IPLDModel, 0, len(payload.Receipts)) - rctLeafCID, rctIPLDData, err := GetRctLeafNodeData(payload.Receipts) - if err != nil { - return err - } - - for idx, receipt := range payload.Receipts { - // topics is always length 4 - topics := make([][]string, 4) - contracts := make([]string, len(receipt.Logs)) - for _, l := range receipt.Logs { - contracts = append(contracts, l.Address.String()) - for idx, t := range l.Topics { - topics[idx] = append(topics[idx], t.String()) - } - } - - // TODO: Verify this filter logic. - if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) { - response.Receipts = append(response.Receipts, models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: rctIPLDData[idx], - Key: rctLeafCID[idx].String(), - }) - } - } - } - return nil -} - -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedAddresses []string, actualAddresses []string, wantedTrxHashes []common.Hash) bool { - // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go - if len(wantedTopics) == 0 && len(wantedAddresses) == 0 && len(wantedTrxHashes) == 0 { - return true - } - // Keep receipts that are from watched txs - for _, wantedTrxHash := range wantedTrxHashes { - if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { - return true - } - } - // If there are no wanted contract addresses, we keep all receipts that match the topic filter - if len(wantedAddresses) == 0 { - if match := filterMatch(wantedTopics, actualTopics); match { - return true - } - } - // If there are wanted contract addresses to filter on - for _, wantedAddr := range wantedAddresses { - // and this is an address of interest - for _, actualAddr := range actualAddresses { - if wantedAddr == actualAddr { - // we keep the receipt if it matches on the topic filter - if match := filterMatch(wantedTopics, actualTopics); match { - return true - } - } - } - } - return false -} - -// filterMatch returns true if the actualTopics conform to the wantedTopics filter -func filterMatch(wantedTopics, actualTopics [][]string) bool { - // actualTopics should always be length 4, but the members can be nil slices - matches := 0 - for i, actualTopicSet := range actualTopics { - if i < len(wantedTopics) && len(wantedTopics[i]) > 0 { - // If we have topics in this filter slot, count as a match if one of the topics matches - matches += slicesShareString(actualTopicSet, wantedTopics[i]) - } else { - // Filter slot is either empty or doesn't exist => not matching any topics at this slot => counts as a match - matches++ - } - } - return matches == 4 -} - -// returns 1 if the two slices have a string in common, 0 if they do not -func slicesShareString(slice1, slice2 []string) int { - for _, str1 := range slice1 { - for _, str2 := range slice2 { - if str1 == str2 { - return 1 - } - } - } - return 0 -} - -// filterStateAndStorage filters state and storage nodes into the response according to the provided filters -func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { - response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) - response.StorageNodes = make([]StorageNode, 0) - stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) - for i, addr := range stateFilter.Addresses { - stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) - } - storageAddressFilters := make([]common.Hash, len(storageFilter.Addresses)) - for i, addr := range storageFilter.Addresses { - storageAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) - } - storageKeyFilters := make([]common.Hash, len(storageFilter.StorageKeys)) - for i, store := range storageFilter.StorageKeys { - storageKeyFilters[i] = common.HexToHash(store) - } - for _, stateNode := range payload.StateNodes { - if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) { - if stateNode.NodeType == sdtypes.Leaf || stateFilter.IntermediateNodes { - cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.NodeValue, multihash.KECCAK_256) - if err != nil { - return err - } - response.StateNodes = append(response.StateNodes, StateNode{ - StateLeafKey: common.BytesToHash(stateNode.LeafKey), - Path: stateNode.Path, - IPLD: models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: stateNode.NodeValue, - Key: cid.String(), - }, - Type: stateNode.NodeType, - }) - } - } - if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { - for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { - if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { - cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.NodeValue, multihash.KECCAK_256) - if err != nil { - return err - } - response.StorageNodes = append(response.StorageNodes, StorageNode{ - StateLeafKey: common.BytesToHash(stateNode.LeafKey), - StorageLeafKey: common.BytesToHash(storageNode.LeafKey), - IPLD: models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: storageNode.NodeValue, - Key: cid.String(), - }, - Type: storageNode.NodeType, - Path: storageNode.Path, - }) - } - } - } - } - return nil -} - -func checkNodeKeys(wantedKeys []common.Hash, actualKey []byte) bool { - // If we aren't filtering for any specific keys, all nodes are a go - if len(wantedKeys) == 0 { - return true - } - for _, key := range wantedKeys { - if bytes.Equal(key.Bytes(), actualKey) { - return true - } - } - return false -} - -// GetRctLeafNodeData converts the receipts to receipt trie and returns the receipt leaf node IPLD data and -// corresponding CIDs -func GetRctLeafNodeData(rcts types.Receipts) ([]cid.Cid, [][]byte, error) { - receiptTrie := ipld.NewRctTrie() - for idx, rct := range rcts { - ethRct, err := ipld.NewReceipt(rct) - if err != nil { - return nil, nil, err - } - if err = receiptTrie.Add(idx, ethRct.RawData()); err != nil { - return nil, nil, err - } - } - - rctLeafNodes, keys, err := receiptTrie.GetLeafNodes() - if err != nil { - return nil, nil, err - } - - ethRctleafNodeCids := make([]cid.Cid, len(rctLeafNodes)) - ethRctleafNodeData := make([][]byte, len(rctLeafNodes)) - for i, rln := range rctLeafNodes { - var idx uint - - r := bytes.NewReader(keys[i].TrieKey) - err = rlp.Decode(r, &idx) - if err != nil { - return nil, nil, err - } - - ethRctleafNodeCids[idx] = rln.Cid() - ethRctleafNodeData[idx] = rln.RawData() - } - - return ethRctleafNodeCids, ethRctleafNodeData, nil -} diff --git a/pkg/eth/filterer_test.go b/pkg/eth/filterer_test.go deleted file mode 100644 index 530ec01a1..000000000 --- a/pkg/eth/filterer_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/statediff/indexer/models" - sdtypes "github.com/ethereum/go-ethereum/statediff/types" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" -) - -var ( - filterer *eth.ResponseFilterer -) - -var _ = Describe("Filterer", func() { - Describe("FilterResponse", func() { - BeforeEach(func() { - filterer = eth.NewResponseFilterer() - }) - - It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { - iplds, err := filterer.Filter(openFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds).ToNot(BeNil()) - Expect(iplds.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header)) - var expectedEmptyUncles []models.IPLDModel - Expect(iplds.Uncles).To(Equal(expectedEmptyUncles)) - Expect(len(iplds.Transactions)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx1)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx3)).To(BeTrue()) - Expect(len(iplds.Receipts)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct1IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct2IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct3IPLD)).To(BeTrue()) - Expect(len(iplds.StateNodes)).To(Equal(2)) - for _, stateNode := range iplds.StateNodes { - Expect(stateNode.Type).To(Equal(sdtypes.Leaf)) - if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.AccountLeafKey) { - Expect(stateNode.IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State2IPLD.RawData(), - Key: test_helpers.State2IPLD.Cid().String(), - })) - } - if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.ContractLeafKey) { - Expect(stateNode.IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State1IPLD.RawData(), - Key: test_helpers.State1IPLD.Cid().String(), - })) - } - } - Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes)) - }) - - It("Applies filters from the provided config.Subscription", func() { - iplds1, err := filterer.Filter(rctAddressFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds1).ToNot(BeNil()) - Expect(iplds1.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds1.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds1.Uncles)).To(Equal(0)) - Expect(len(iplds1.Transactions)).To(Equal(0)) - Expect(len(iplds1.StorageNodes)).To(Equal(0)) - Expect(len(iplds1.StateNodes)).To(Equal(0)) - Expect(len(iplds1.Receipts)).To(Equal(1)) - Expect(iplds1.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds2, err := filterer.Filter(rctTopicsFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds2).ToNot(BeNil()) - Expect(iplds2.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds2.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds2.Uncles)).To(Equal(0)) - Expect(len(iplds2.Transactions)).To(Equal(0)) - Expect(len(iplds2.StorageNodes)).To(Equal(0)) - Expect(len(iplds2.StateNodes)).To(Equal(0)) - Expect(len(iplds2.Receipts)).To(Equal(1)) - Expect(iplds2.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds3, err := filterer.Filter(rctTopicsAndAddressFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds3).ToNot(BeNil()) - Expect(iplds3.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds3.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds3.Uncles)).To(Equal(0)) - Expect(len(iplds3.Transactions)).To(Equal(0)) - Expect(len(iplds3.StorageNodes)).To(Equal(0)) - Expect(len(iplds3.StateNodes)).To(Equal(0)) - Expect(len(iplds3.Receipts)).To(Equal(1)) - Expect(iplds3.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds4, err := filterer.Filter(rctAddressesAndTopicFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds4).ToNot(BeNil()) - Expect(iplds4.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds4.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds4.Uncles)).To(Equal(0)) - Expect(len(iplds4.Transactions)).To(Equal(0)) - Expect(len(iplds4.StorageNodes)).To(Equal(0)) - Expect(len(iplds4.StateNodes)).To(Equal(0)) - Expect(len(iplds4.Receipts)).To(Equal(1)) - Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct2IPLD, - Key: test_helpers.Rct2CID.String(), - })) - - iplds5, err := filterer.Filter(rctsForAllCollectedTrxs, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds5).ToNot(BeNil()) - Expect(iplds5.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds5.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds5.Uncles)).To(Equal(0)) - Expect(len(iplds5.Transactions)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx1)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx3)).To(BeTrue()) - Expect(len(iplds5.StorageNodes)).To(Equal(0)) - Expect(len(iplds5.StateNodes)).To(Equal(0)) - Expect(len(iplds5.Receipts)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct1IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct2IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct3IPLD)).To(BeTrue()) - - iplds6, err := filterer.Filter(rctsForSelectCollectedTrxs, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds6).ToNot(BeNil()) - Expect(iplds6.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds6.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds6.Uncles)).To(Equal(0)) - Expect(len(iplds6.Transactions)).To(Equal(1)) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(len(iplds6.StorageNodes)).To(Equal(0)) - Expect(len(iplds6.StateNodes)).To(Equal(0)) - Expect(len(iplds6.Receipts)).To(Equal(1)) - Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct2IPLD, - Key: test_helpers.Rct2CID.String(), - })) - - iplds7, err := filterer.Filter(stateFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds7).ToNot(BeNil()) - Expect(iplds7.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds7.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds7.Uncles)).To(Equal(0)) - Expect(len(iplds7.Transactions)).To(Equal(0)) - Expect(len(iplds7.StorageNodes)).To(Equal(0)) - Expect(len(iplds7.Receipts)).To(Equal(0)) - Expect(len(iplds7.StateNodes)).To(Equal(1)) - Expect(iplds7.StateNodes[0].StateLeafKey.Bytes()).To(Equal(test_helpers.AccountLeafKey)) - Expect(iplds7.StateNodes[0].IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State2IPLD.RawData(), - Key: test_helpers.State2IPLD.Cid().String(), - })) - - iplds8, err := filterer.Filter(rctTopicsAndAddressFilterFail, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds8).ToNot(BeNil()) - Expect(iplds8.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds8.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds8.Uncles)).To(Equal(0)) - Expect(len(iplds8.Transactions)).To(Equal(0)) - Expect(len(iplds8.StorageNodes)).To(Equal(0)) - Expect(len(iplds8.StateNodes)).To(Equal(0)) - Expect(len(iplds8.Receipts)).To(Equal(0)) - }) - }) -}) diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go deleted file mode 100644 index f734f012d..000000000 --- a/pkg/eth/ipld_fetcher.go +++ /dev/null @@ -1,248 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "errors" - "fmt" - "math/big" - "strconv" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - "github.com/jmoiron/sqlx" -) - -// Fetcher interface for substituting mocks in tests -type Fetcher interface { - Fetch(cids CIDWrapper) (*IPLDs, error) -} - -// IPLDFetcher satisfies the IPLDFetcher interface for ethereum -// It interfaces directly with PG-IPFS -type IPLDFetcher struct { - db *sqlx.DB -} - -// NewIPLDFetcher creates a pointer to a new IPLDFetcher -func NewIPLDFetcher(db *sqlx.DB) *IPLDFetcher { - return &IPLDFetcher{ - db: db, - } -} - -// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) { - log.Debug("fetching iplds") - iplds := new(IPLDs) - var ok bool - iplds.TotalDifficulty, ok = new(big.Int).SetString(cids.Header.TotalDifficulty, 10) - if !ok { - return nil, errors.New("eth fetcher: unable to set total difficulty") - } - iplds.BlockNumber = cids.BlockNumber - - tx, err := f.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - iplds.Header, err = f.FetchHeader(tx, cids.Header) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) - } - iplds.Uncles, err = f.FetchUncles(tx, cids.Uncles) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) - } - iplds.Transactions, err = f.FetchTrxs(tx, cids.Transactions) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) - } - iplds.Receipts, err = f.FetchRcts(tx, cids.Receipts) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) - } - iplds.StateNodes, err = f.FetchState(tx, cids.StateNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) - } - iplds.StorageNodes, err = f.FetchStorage(tx, cids.StorageNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) - } - return iplds, err -} - -// FetchHeader fetches header -func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) { - log.Debug("fetching header ipld") - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return models.IPLDModel{}, err - } - - headerBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return models.IPLDModel{}, err - } - return models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: headerBytes, - Key: c.CID, - }, nil -} - -// FetchUncles fetches uncles -func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) { - log.Debug("fetching uncle iplds") - uncleIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - uncleBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return nil, err - } - uncleIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: uncleBytes, - Key: c.CID, - } - } - return uncleIPLDs, nil -} - -// FetchTrxs fetches transactions -func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]models.IPLDModel, error) { - log.Debug("fetching transaction iplds") - trxIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - txBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return nil, err - } - trxIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: txBytes, - Key: c.CID, - } - } - return trxIPLDs, nil -} - -// FetchRcts fetches receipts -func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]models.IPLDModel, error) { - log.Debug("fetching receipt iplds") - rctIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - rctBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.LeafMhKey, blockNumber) - if err != nil { - return nil, err - } - //nodeVal, err := DecodeLeafNode(rctBytes) - rctIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: rctBytes, - Key: c.LeafCID, - } - } - return rctIPLDs, nil -} - -// FetchState fetches state nodes -func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) { - log.Debug("fetching state iplds") - stateNodes := make([]StateNode, 0, len(cids)) - for _, stateNode := range cids { - if stateNode.CID == "" { - continue - } - blockNumber, err := strconv.ParseUint(stateNode.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - stateBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, stateNode.MhKey, blockNumber) - if err != nil { - return nil, err - } - stateNodes = append(stateNodes, StateNode{ - IPLD: models.IPLDModel{ - BlockNumber: stateNode.BlockNumber, - Data: stateBytes, - Key: stateNode.CID, - }, - StateLeafKey: common.HexToHash(stateNode.StateKey), - Type: ResolveToNodeType(stateNode.NodeType), - Path: stateNode.Path, - }) - } - return stateNodes, nil -} - -// FetchStorage fetches storage nodes -func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithStateKeyModel) ([]StorageNode, error) { - log.Debug("fetching storage iplds") - storageNodes := make([]StorageNode, 0, len(cids)) - for _, storageNode := range cids { - if storageNode.CID == "" || storageNode.StateKey == "" { - continue - } - blockNumber, err := strconv.ParseUint(storageNode.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - storageBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, storageNode.MhKey, blockNumber) - if err != nil { - return nil, err - } - storageNodes = append(storageNodes, StorageNode{ - IPLD: models.IPLDModel{ - BlockNumber: storageNode.BlockNumber, - Data: storageBytes, - Key: storageNode.CID, - }, - StateLeafKey: common.HexToHash(storageNode.StateKey), - StorageLeafKey: common.HexToHash(storageNode.StorageKey), - Type: ResolveToNodeType(storageNode.NodeType), - Path: storageNode.Path, - }) - } - return storageNodes, nil -} diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go deleted file mode 100644 index 38d7d84cb..000000000 --- a/pkg/eth/ipld_fetcher_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" - "github.com/jmoiron/sqlx" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" -) - -var _ = Describe("IPLDFetcher", func() { - var ( - db *sqlx.DB - pubAndIndexer interfaces.StateDiffIndexer - fetcher *eth.IPLDFetcher - ) - Describe("Fetch", func() { - BeforeEach(func() { - var ( - err error - tx interfaces.Batch - ) - db = shared.SetupDB() - pubAndIndexer = shared.SetupTestStateDiffIndexer(ctx, params.TestChainConfig, test_helpers.Genesis.Hash()) - - tx, err = pubAndIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) - for _, node := range test_helpers.MockStateNodes { - err = pubAndIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) - Expect(err).ToNot(HaveOccurred()) - } - - err = tx.Submit(err) - Expect(err).ToNot(HaveOccurred()) - fetcher = eth.NewIPLDFetcher(db) - - }) - AfterEach(func() { - shared.TearDownDB(db) - }) - - It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { - iplds, err := fetcher.Fetch(*test_helpers.MockCIDWrapper) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds).ToNot(BeNil()) - Expect(iplds.TotalDifficulty).To(Equal(test_helpers.MockConvertedPayload.TotalDifficulty)) - Expect(iplds.BlockNumber).To(Equal(test_helpers.MockConvertedPayload.Block.Number())) - Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header)) - Expect(len(iplds.Uncles)).To(Equal(0)) - Expect(iplds.Transactions).To(Equal(test_helpers.MockIPLDs.Transactions)) - Expect(iplds.Receipts).To(Equal(test_helpers.MockIPLDs.Receipts)) - Expect(iplds.StateNodes).To(Equal(test_helpers.MockIPLDs.StateNodes)) - Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes)) - }) - }) -}) diff --git a/pkg/eth/ipld_retriever.go b/pkg/eth/ipld_retriever.go index 3cfb65301..51a12cb44 100644 --- a/pkg/eth/ipld_retriever.go +++ b/pkg/eth/ipld_retriever.go @@ -28,24 +28,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" - "github.com/lib/pq" ) const ( - RetrieveHeadersByHashesPgStr = `SELECT cid, data - FROM eth.header_cids - INNER JOIN public.blocks ON ( - header_cids.mh_key = blocks.key - AND header_cids.block_number = blocks.block_number - ) - WHERE block_hash = ANY($1::VARCHAR(66)[])` - RetrieveHeadersByBlockNumberPgStr = `SELECT cid, data - FROM eth.header_cids - INNER JOIN public.blocks ON ( - header_cids.mh_key = blocks.key - AND header_cids.block_number = blocks.block_number - ) - WHERE header_cids.block_number = $1` RetrieveHeaderByHashPgStr = `SELECT cid, data FROM eth.header_cids INNER JOIN public.blocks ON ( @@ -53,13 +38,6 @@ const ( AND header_cids.block_number = blocks.block_number ) WHERE block_hash = $1` - RetrieveUnclesByHashesPgStr = `SELECT cid, data - FROM eth.uncle_cids - INNER JOIN public.blocks ON ( - uncle_cids.mh_key = blocks.key - AND uncle_cids.block_number = blocks.block_number - ) - WHERE block_hash = ANY($1::VARCHAR(66)[])` RetrieveUnclesPgStr = `SELECT uncle_cids.cid, data FROM eth.uncle_cids INNER JOIN eth.header_cids ON ( @@ -85,31 +63,6 @@ const ( ) WHERE header_cids.block_hash = $1 ORDER BY uncle_cids.parent_hash` - RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data - FROM eth.uncle_cids - INNER JOIN eth.header_cids ON ( - uncle_cids.header_id = header_cids.block_hash - AND uncle_cids.block_number = header_cids.block_number - ) - INNER JOIN public.blocks ON ( - uncle_cids.mh_key = blocks.key - AND uncle_cids.block_number = blocks.block_number - ) - WHERE header_cids.block_number = $1` - RetrieveUncleByHashPgStr = `SELECT cid, data - FROM eth.uncle_cids - INNER JOIN public.blocks ON ( - uncle_cids.mh_key = blocks.key - AND uncle_cids.block_number = blocks.block_number - ) - WHERE block_hash = $1` - RetrieveTransactionsByHashesPgStr = `SELECT DISTINCT ON (tx_hash) cid, data - FROM eth.transaction_cids - INNER JOIN public.blocks ON ( - transaction_cids.mh_key = blocks.key - AND transaction_cids.block_number = blocks.block_number - ) - WHERE tx_hash = ANY($1::VARCHAR(66)[])` RetrieveTransactionsPgStr = `SELECT transaction_cids.cid, data FROM eth.transaction_cids INNER JOIN eth.header_cids ON ( @@ -135,39 +88,6 @@ const ( ) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` - RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data - FROM eth.transaction_cids - INNER JOIN eth.header_cids ON ( - transaction_cids.header_id = header_cids.block_hash - AND transaction_cids.block_number = header_cids.block_number - ) - INNER JOIN public.blocks ON ( - transaction_cids.mh_key = blocks.key - AND transaction_cids.block_number = blocks.block_number - ) - WHERE header_cids.block_number = $1 - AND block_hash = (SELECT canonical_header_hash(header_cids.block_number)) - ORDER BY eth.transaction_cids.index ASC` - RetrieveTransactionByHashPgStr = `SELECT DISTINCT ON (tx_hash) cid, data - FROM eth.transaction_cids - INNER JOIN public.blocks ON ( - transaction_cids.mh_key = blocks.key - AND transaction_cids.block_number = blocks.block_number - ) - WHERE tx_hash = $1` - RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data - FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON ( - receipt_cids.tx_id = transaction_cids.tx_hash - AND receipt_cids.header_id = transaction_cids.header_id - AND receipt_cids.block_number = transaction_cids.block_number - ) - INNER JOIN public.blocks ON ( - receipt_cids.leaf_mh_key = blocks.key - AND receipt_cids.block_number = blocks.block_number - ) - WHERE tx_hash = ANY($1::VARCHAR(66)[]) - AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number))` RetrieveReceiptsPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash FROM eth.receipt_cids INNER JOIN eth.transaction_cids ON ( @@ -203,37 +123,6 @@ const ( ) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` - RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data - FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON ( - receipt_cids.tx_id = transaction_cids.tx_hash - AND receipt_cids.header_id = transaction_cids.header_id - AND receipt_cids.block_number = transaction_cids.block_number - ) - INNER JOIN eth.header_cids ON ( - transaction_cids.header_id = header_cids.block_hash - AND transaction_cids.block_number = header_cids.block_number - ) - INNER JOIN public.blocks ON ( - receipt_cids.leaf_mh_key = blocks.key - AND receipt_cids.block_number = blocks.block_number - ) - WHERE header_cids.block_number = $1 - AND block_hash = (SELECT canonical_header_hash(header_cids.block_number)) - ORDER BY eth.transaction_cids.index ASC` - RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data - FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON ( - receipt_cids.tx_id = transaction_cids.tx_hash - AND receipt_cids.header_id = transaction_cids.header_id - AND receipt_cids.block_number = transaction_cids.block_number - ) - INNER JOIN public.blocks ON ( - receipt_cids.leaf_mh_key = blocks.key - AND receipt_cids.block_number = blocks.block_number - ) - WHERE tx_hash = $1 - AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number))` RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, state_cids.mh_key, state_cids.block_number, state_cids.node_type FROM eth.state_cids INNER JOIN eth.header_cids ON ( @@ -247,18 +136,7 @@ const ( AND header_cids.block_hash = (SELECT canonical_header_hash(header_cids.block_number)) ORDER BY header_cids.block_number DESC LIMIT 1` - RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, state_cids.mh_key, state_cids.node_type - FROM eth.state_cids - INNER JOIN eth.header_cids ON ( - state_cids.header_id = header_cids.block_hash - AND state_cids.block_number = header_cids.block_number - ) - WHERE state_leaf_key = $1 - AND header_cids.block_number <= $2 - ORDER BY header_cids.block_number DESC - LIMIT 1` - RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr = `SELECT cid, mh_key, block_number, node_type, state_leaf_removed FROM get_storage_at_by_number($1, $2, $3)` - RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, mh_key, block_number, node_type, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)` + RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, mh_key, block_number, node_type, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)` ) var EmptyNodeValue = make([]byte, common.HashLength) @@ -285,66 +163,12 @@ func NewIPLDRetriever(db *sqlx.DB) *IPLDRetriever { } } -// RetrieveHeadersByHashes returns the cids and rlp bytes for the headers corresponding to the provided block hashes -func (r *IPLDRetriever) RetrieveHeadersByHashes(hashes []common.Hash) ([]string, [][]byte, error) { - headerResults := make([]ipldResult, 0) - hashStrs := make([]string, len(hashes)) - for i, hash := range hashes { - hashStrs[i] = hash.Hex() - } - if err := r.db.Select(&headerResults, RetrieveHeadersByHashesPgStr, pq.Array(hashStrs)); err != nil { - return nil, nil, err - } - cids := make([]string, len(headerResults)) - headers := make([][]byte, len(headerResults)) - for i, res := range headerResults { - cids[i] = res.CID - headers[i] = res.Data - } - return cids, headers, nil -} - -// RetrieveHeadersByBlockNumber returns the cids and rlp bytes for the headers corresponding to the provided block number -// This can return more than one result since there can be more than one header (non-canonical headers) -func (r *IPLDRetriever) RetrieveHeadersByBlockNumber(number uint64) ([]string, [][]byte, error) { - headerResults := make([]ipldResult, 0) - if err := r.db.Select(&headerResults, RetrieveHeadersByBlockNumberPgStr, number); err != nil { - return nil, nil, err - } - cids := make([]string, len(headerResults)) - headers := make([][]byte, len(headerResults)) - for i, res := range headerResults { - cids[i] = res.CID - headers[i] = res.Data - } - return cids, headers, nil -} - // RetrieveHeaderByHash returns the cid and rlp bytes for the header corresponding to the provided block hash func (r *IPLDRetriever) RetrieveHeaderByHash(tx *sqlx.Tx, hash common.Hash) (string, []byte, error) { headerResult := new(ipldResult) return headerResult.CID, headerResult.Data, tx.Get(headerResult, RetrieveHeaderByHashPgStr, hash.Hex()) } -// RetrieveUnclesByHashes returns the cids and rlp bytes for the uncles corresponding to the provided uncle hashes -func (r *IPLDRetriever) RetrieveUnclesByHashes(hashes []common.Hash) ([]string, [][]byte, error) { - uncleResults := make([]ipldResult, 0) - hashStrs := make([]string, len(hashes)) - for i, hash := range hashes { - hashStrs[i] = hash.Hex() - } - if err := r.db.Select(&uncleResults, RetrieveUnclesByHashesPgStr, pq.Array(hashStrs)); err != nil { - return nil, nil, err - } - cids := make([]string, len(uncleResults)) - uncles := make([][]byte, len(uncleResults)) - for i, res := range uncleResults { - cids[i] = res.CID - uncles[i] = res.Data - } - return cids, uncles, nil -} - // RetrieveUncles returns the cids and rlp bytes for the uncles corresponding to the provided block hash, number (of non-omner root block) func (r *IPLDRetriever) RetrieveUncles(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, error) { uncleResults := make([]ipldResult, 0) @@ -375,46 +199,6 @@ func (r *IPLDRetriever) RetrieveUnclesByBlockHash(tx *sqlx.Tx, hash common.Hash) return cids, uncles, nil } -// RetrieveUnclesByBlockNumber returns the cids and rlp bytes for the uncles corresponding to the provided block number (of non-omner root block) -func (r *IPLDRetriever) RetrieveUnclesByBlockNumber(number uint64) ([]string, [][]byte, error) { - uncleResults := make([]ipldResult, 0) - if err := r.db.Select(&uncleResults, RetrieveUnclesByBlockNumberPgStr, number); err != nil { - return nil, nil, err - } - cids := make([]string, len(uncleResults)) - uncles := make([][]byte, len(uncleResults)) - for i, res := range uncleResults { - cids[i] = res.CID - uncles[i] = res.Data - } - return cids, uncles, nil -} - -// RetrieveUncleByHash returns the cid and rlp bytes for the uncle corresponding to the provided uncle hash -func (r *IPLDRetriever) RetrieveUncleByHash(hash common.Hash) (string, []byte, error) { - uncleResult := new(ipldResult) - return uncleResult.CID, uncleResult.Data, r.db.Get(uncleResult, RetrieveUncleByHashPgStr, hash.Hex()) -} - -// RetrieveTransactionsByHashes returns the cids and rlp bytes for the transactions corresponding to the provided tx hashes -func (r *IPLDRetriever) RetrieveTransactionsByHashes(hashes []common.Hash) ([]string, [][]byte, error) { - txResults := make([]ipldResult, 0) - hashStrs := make([]string, len(hashes)) - for i, hash := range hashes { - hashStrs[i] = hash.Hex() - } - if err := r.db.Select(&txResults, RetrieveTransactionsByHashesPgStr, pq.Array(hashStrs)); err != nil { - return nil, nil, err - } - cids := make([]string, len(txResults)) - txs := make([][]byte, len(txResults)) - for i, res := range txResults { - cids[i] = res.CID - txs[i] = res.Data - } - return cids, txs, nil -} - // RetrieveTransactions returns the cids and rlp bytes for the transactions corresponding to the provided block hash, number func (r *IPLDRetriever) RetrieveTransactions(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, error) { txResults := make([]ipldResult, 0) @@ -445,27 +229,6 @@ func (r *IPLDRetriever) RetrieveTransactionsByBlockHash(tx *sqlx.Tx, hash common return cids, txs, nil } -// RetrieveTransactionsByBlockNumber returns the cids and rlp bytes for the transactions corresponding to the provided block number -func (r *IPLDRetriever) RetrieveTransactionsByBlockNumber(number uint64) ([]string, [][]byte, error) { - txResults := make([]ipldResult, 0) - if err := r.db.Select(&txResults, RetrieveTransactionsByBlockNumberPgStr, number); err != nil { - return nil, nil, err - } - cids := make([]string, len(txResults)) - txs := make([][]byte, len(txResults)) - for i, res := range txResults { - cids[i] = res.CID - txs[i] = res.Data - } - return cids, txs, nil -} - -// RetrieveTransactionByTxHash returns the cid and rlp bytes for the transaction corresponding to the provided tx hash -func (r *IPLDRetriever) RetrieveTransactionByTxHash(hash common.Hash) (string, []byte, error) { - txResult := new(ipldResult) - return txResult.CID, txResult.Data, r.db.Get(txResult, RetrieveTransactionByHashPgStr, hash.Hex()) -} - // DecodeLeafNode decodes the leaf node data func DecodeLeafNode(node []byte) ([]byte, error) { var nodeElements []interface{} @@ -483,29 +246,6 @@ func DecodeLeafNode(node []byte) ([]byte, error) { return nodeElements[1].([]byte), nil } -// RetrieveReceiptsByTxHashes returns the cids and rlp bytes for the receipts corresponding to the provided tx hashes -func (r *IPLDRetriever) RetrieveReceiptsByTxHashes(hashes []common.Hash) ([]string, [][]byte, error) { - rctResults := make([]rctIpldResult, 0) - hashStrs := make([]string, len(hashes)) - for i, hash := range hashes { - hashStrs[i] = hash.Hex() - } - if err := r.db.Select(&rctResults, RetrieveReceiptsByTxHashesPgStr, pq.Array(hashStrs)); err != nil { - return nil, nil, err - } - cids := make([]string, len(rctResults)) - rcts := make([][]byte, len(rctResults)) - for i, res := range rctResults { - cids[i] = res.LeafCID - nodeVal, err := DecodeLeafNode(res.Data) - if err != nil { - return nil, nil, err - } - rcts[i] = nodeVal - } - return cids, rcts, nil -} - // RetrieveReceipts returns the cids and rlp bytes for the receipts corresponding to the provided block hash, number. // cid returned corresponds to the leaf node data which contains the receipt. func (r *IPLDRetriever) RetrieveReceipts(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, []common.Hash, error) { @@ -554,41 +294,6 @@ func (r *IPLDRetriever) RetrieveReceiptsByBlockHash(tx *sqlx.Tx, hash common.Has return cids, rcts, txs, nil } -// RetrieveReceiptsByBlockNumber returns the cids and rlp bytes for the receipts corresponding to the provided block hash. -// cid returned corresponds to the leaf node data which contains the receipt. -func (r *IPLDRetriever) RetrieveReceiptsByBlockNumber(number uint64) ([]string, [][]byte, error) { - rctResults := make([]rctIpldResult, 0) - if err := r.db.Select(&rctResults, RetrieveReceiptsByBlockNumberPgStr, number); err != nil { - return nil, nil, err - } - cids := make([]string, len(rctResults)) - rcts := make([][]byte, len(rctResults)) - for i, res := range rctResults { - cids[i] = res.LeafCID - nodeVal, err := DecodeLeafNode(res.Data) - if err != nil { - return nil, nil, err - } - rcts[i] = nodeVal - } - return cids, rcts, nil -} - -// RetrieveReceiptByHash returns the cid and rlp bytes for the receipt corresponding to the provided tx hash. -// cid returned corresponds to the leaf node data which contains the receipt. -func (r *IPLDRetriever) RetrieveReceiptByHash(hash common.Hash) (string, []byte, error) { - rctResult := new(rctIpldResult) - if err := r.db.Select(&rctResult, RetrieveReceiptByTxHashPgStr, hash.Hex()); err != nil { - return "", nil, err - } - - nodeVal, err := DecodeLeafNode(rctResult.Data) - if err != nil { - return "", nil, err - } - return rctResult.LeafCID, nodeVal, nil -} - type nodeInfo struct { CID string `db:"cid"` MhKey string `db:"mh_key"` @@ -630,35 +335,6 @@ func (r *IPLDRetriever) RetrieveAccountByAddressAndBlockHash(address common.Addr return accountResult.CID, i[1].([]byte), nil } -// RetrieveAccountByAddressAndBlockNumber returns the cid and rlp bytes for the account corresponding to the provided address and block number -// This can return a non-canonical account -func (r *IPLDRetriever) RetrieveAccountByAddressAndBlockNumber(address common.Address, number uint64) (string, []byte, error) { - accountResult := new(nodeInfo) - leafKey := crypto.Keccak256Hash(address.Bytes()) - if err := r.db.Get(accountResult, RetrieveAccountByLeafKeyAndBlockNumberPgStr, leafKey.Hex(), number); err != nil { - return "", nil, err - } - - if accountResult.NodeType == sdtypes.Removed.Int() { - return "", EmptyNodeValue, nil - } - - var err error - accountResult.Data, err = shared.FetchIPLD(r.db, accountResult.MhKey, number) - if err != nil { - return "", nil, err - } - - var i []interface{} - if err := rlp.DecodeBytes(accountResult.Data, &i); err != nil { - return "", nil, fmt.Errorf("error decoding state leaf node rlp: %s", err.Error()) - } - if len(i) != 2 { - return "", nil, fmt.Errorf("eth IPLDRetriever expected state leaf node rlp to decode into two elements") - } - return accountResult.CID, i[1].([]byte), nil -} - // RetrieveStorageAtByAddressAndStorageSlotAndBlockHash returns the cid and rlp bytes for the storage value corresponding to the provided address, storage slot, and block hash func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageSlotAndBlockHash(address common.Address, key, hash common.Hash) (string, []byte, []byte, error) { storageResult := new(nodeInfo) @@ -690,32 +366,3 @@ func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageSlotAndBlockHash(add } return storageResult.CID, storageResult.Data, i[1].([]byte), nil } - -// RetrieveStorageAtByAddressAndStorageKeyAndBlockNumber returns the cid and rlp bytes for the storage value corresponding to the provided address, storage key, and block number -// This can retrun a non-canonical value -func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageKeyAndBlockNumber(address common.Address, storageLeafKey common.Hash, number uint64) (string, []byte, error) { - storageResult := new(nodeInfo) - stateLeafKey := crypto.Keccak256Hash(address.Bytes()) - if err := r.db.Get(storageResult, RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr, stateLeafKey.Hex(), storageLeafKey.Hex(), number); err != nil { - return "", nil, err - } - - if storageResult.StateLeafRemoved || storageResult.NodeType == sdtypes.Removed.Int() { - return "", EmptyNodeValue, nil - } - - var err error - storageResult.Data, err = shared.FetchIPLD(r.db, storageResult.MhKey, number) - if err != nil { - return "", nil, err - } - - var i []interface{} - if err := rlp.DecodeBytes(storageResult.Data, &i); err != nil { - return "", nil, fmt.Errorf("error decoding storage leaf node rlp: %s", err.Error()) - } - if len(i) != 2 { - return "", nil, fmt.Errorf("eth IPLDRetriever expected storage leaf node rlp to decode into two elements") - } - return storageResult.CID, i[1].([]byte), nil -} diff --git a/pkg/eth/subscription_config.go b/pkg/eth/subscription_config.go index d74ad3fd9..24b4f48eb 100644 --- a/pkg/eth/subscription_config.go +++ b/pkg/eth/subscription_config.go @@ -16,38 +16,6 @@ package eth -import ( - "math/big" - - "github.com/spf13/viper" -) - -// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher -type SubscriptionSettings struct { - BackFill bool - BackFillOnly bool - Start *big.Int - End *big.Int // set to 0 or a negative value to have no ending block - HeaderFilter HeaderFilter - TxFilter TxFilter - ReceiptFilter ReceiptFilter - StateFilter StateFilter - StorageFilter StorageFilter -} - -// HeaderFilter contains filter settings for headers -type HeaderFilter struct { - Off bool - Uncles bool -} - -// TxFilter contains filter settings for txs -type TxFilter struct { - Off bool - Src []string - Dst []string -} - // ReceiptFilter contains filter settings for receipts type ReceiptFilter struct { Off bool @@ -56,70 +24,3 @@ type ReceiptFilter struct { LogAddresses []string // receipt contains logs from the provided addresses Topics [][]string } - -// StateFilter contains filter settings for state -type StateFilter struct { - Off bool - Addresses []string // is converted to state key by taking its keccak256 hash - IntermediateNodes bool -} - -// StorageFilter contains filter settings for storage -type StorageFilter struct { - Off bool - Addresses []string - StorageKeys []string // need to be the hashs key themselves not slot position - IntermediateNodes bool -} - -// Init is used to initialize a EthSubscription struct with env variables -func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { - sc := new(SubscriptionSettings) - // Below default to false, which means we do not backfill by default - sc.BackFill = viper.GetBool("watcher.ethSubscription.historicalData") - sc.BackFillOnly = viper.GetBool("watcher.ethSubscription.historicalDataOnly") - // Below default to 0 - // 0 start means we start at the beginning and 0 end means we continue indefinitely - sc.Start = big.NewInt(viper.GetInt64("watcher.ethSubscription.startingBlock")) - sc.End = big.NewInt(viper.GetInt64("watcher.ethSubscription.endingBlock")) - // Below default to false, which means we get all headers and no uncles by default - sc.HeaderFilter = HeaderFilter{ - Off: viper.GetBool("watcher.ethSubscription.headerFilter.off"), - Uncles: viper.GetBool("watcher.ethSubscription.headerFilter.uncles"), - } - // Below defaults to false and two slices of length 0 - // Which means we get all transactions by default - sc.TxFilter = TxFilter{ - Off: viper.GetBool("watcher.ethSubscription.txFilter.off"), - Src: viper.GetStringSlice("watcher.ethSubscription.txFilter.src"), - Dst: viper.GetStringSlice("watcher.ethSubscription.txFilter.dst"), - } - // By default all of the topic slices will be empty => match on any/all topics - topics := make([][]string, 4) - topics[0] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic0s") - topics[1] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic1s") - topics[2] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic2s") - topics[3] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic3s") - sc.ReceiptFilter = ReceiptFilter{ - Off: viper.GetBool("watcher.ethSubscription.receiptFilter.off"), - MatchTxs: viper.GetBool("watcher.ethSubscription.receiptFilter.matchTxs"), - LogAddresses: viper.GetStringSlice("watcher.ethSubscription.receiptFilter.contracts"), - Topics: topics, - } - // Below defaults to two false, and a slice of length 0 - // Which means we get all state leafs by default, but no intermediate nodes - sc.StateFilter = StateFilter{ - Off: viper.GetBool("watcher.ethSubscription.stateFilter.off"), - IntermediateNodes: viper.GetBool("watcher.ethSubscription.stateFilter.intermediateNodes"), - Addresses: viper.GetStringSlice("watcher.ethSubscription.stateFilter.addresses"), - } - // Below defaults to two false, and two slices of length 0 - // Which means we get all storage leafs by default, but no intermediate nodes - sc.StorageFilter = StorageFilter{ - Off: viper.GetBool("watcher.ethSubscription.storageFilter.off"), - IntermediateNodes: viper.GetBool("watcher.ethSubscription.storageFilter.intermediateNodes"), - Addresses: viper.GetStringSlice("watcher.ethSubscription.storageFilter.addresses"), - StorageKeys: viper.GetStringSlice("watcher.ethSubscription.storageFilter.storageKeys"), - } - return sc, nil -} diff --git a/pkg/eth/test_helpers/chain_maker.go b/pkg/eth/test_helpers/chain_maker.go index 4f24c888a..a3a7369d7 100644 --- a/pkg/eth/test_helpers/chain_maker.go +++ b/pkg/eth/test_helpers/chain_maker.go @@ -17,6 +17,7 @@ package test_helpers import ( + "bytes" "math/big" "github.com/ethereum/go-ethereum/common" @@ -27,7 +28,10 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/test_helpers" + "github.com/ipfs/go-cid" ) // Test variables @@ -106,3 +110,40 @@ func TestChainGen(i int, block *core.BlockGen) { block.AddTx(tx) } } + +// GetRctLeafNodeData converts the receipts to receipt trie and returns the receipt leaf node IPLD data and +// corresponding CIDs +func GetRctLeafNodeData(rcts types.Receipts) ([]cid.Cid, [][]byte, error) { + receiptTrie := ipld.NewRctTrie() + for idx, rct := range rcts { + ethRct, err := ipld.NewReceipt(rct) + if err != nil { + return nil, nil, err + } + if err = receiptTrie.Add(idx, ethRct.RawData()); err != nil { + return nil, nil, err + } + } + + rctLeafNodes, keys, err := receiptTrie.GetLeafNodes() + if err != nil { + return nil, nil, err + } + + ethRctleafNodeCids := make([]cid.Cid, len(rctLeafNodes)) + ethRctleafNodeData := make([][]byte, len(rctLeafNodes)) + for i, rln := range rctLeafNodes { + var idx uint + + r := bytes.NewReader(keys[i].TrieKey) + err = rlp.Decode(r, &idx) + if err != nil { + return nil, nil, err + } + + ethRctleafNodeCids[idx] = rln.Cid() + ethRctleafNodeData[idx] = rln.RawData() + } + + return ethRctleafNodeCids, ethRctleafNodeData, nil +} diff --git a/pkg/eth/test_helpers/test_data.go b/pkg/eth/test_helpers/test_data.go index 78e7bac8b..2e5d40b97 100644 --- a/pkg/eth/test_helpers/test_data.go +++ b/pkg/eth/test_helpers/test_data.go @@ -162,7 +162,7 @@ var ( Tx3 = GetTxnRlp(2, MockTransactions) Tx4 = GetTxnRlp(3, MockTransactions) - rctCIDs, rctIPLDData, _ = eth.GetRctLeafNodeData(MockReceipts) + rctCIDs, rctIPLDData, _ = GetRctLeafNodeData(MockReceipts) HeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256) HeaderMhKey = shared.MultihashKeyFromCID(HeaderCID) Trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, Tx1, multihash.KECCAK_256) diff --git a/pkg/serve/api.go b/pkg/serve/api.go index d83970410..d4f7352e9 100644 --- a/pkg/serve/api.go +++ b/pkg/serve/api.go @@ -17,13 +17,8 @@ package serve import ( - "context" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/log" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff/types" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" ) // APIName is the namespace used for the state diffing service API @@ -46,45 +41,6 @@ func NewPublicServerAPI(w Server, client *rpc.Client) *PublicServerAPI { } } -// Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed -func (api *PublicServerAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) { - // ensure that the RPC connection supports subscriptions - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - // create subscription and start waiting for stream events - rpcSub := notifier.CreateSubscription() - - go func() { - // subscribe to events from the SyncPublishScreenAndServe service - payloadChannel := make(chan SubscriptionPayload, PayloadChanBufferSize) - quitChan := make(chan bool, 1) - go api.w.Subscribe(rpcSub.ID, payloadChannel, quitChan, params) - - // loop and await payloads and relay them to the subscriber using notifier - for { - select { - case packet := <-payloadChannel: - if err := notifier.Notify(rpcSub.ID, packet); err != nil { - log.Error("Failed to send watcher data packet", "err", err) - api.w.Unsubscribe(rpcSub.ID) - return - } - case <-rpcSub.Err(): - api.w.Unsubscribe(rpcSub.ID) - return - case <-quitChan: - // don't need to unsubscribe from the watcher, the service does so before sending the quit signal this way - return - } - } - }() - - return rpcSub, nil -} - // WatchAddress makes a geth WatchAddress API call with the given operation and args func (api *PublicServerAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error { err := api.rpc.Call(nil, "statediff_watchAddress", operation, args) diff --git a/pkg/serve/helpers.go b/pkg/serve/helpers.go deleted file mode 100644 index 358568a3e..000000000 --- a/pkg/serve/helpers.go +++ /dev/null @@ -1,37 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package serve - -import "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - -func sendNonBlockingErr(sub Subscription, err error) { - log.Error(err) - select { - case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Flag: EmptyFlag}: - default: - log.Infof("unable to send error to subscription %s", sub.ID) - } -} - -func sendNonBlockingQuit(sub Subscription) { - select { - case sub.QuitChan <- true: - log.Infof("closing subscription %s", sub.ID) - default: - log.Infof("unable to close subscription %s; channel has no receiver", sub.ID) - } -} diff --git a/pkg/serve/service.go b/pkg/serve/service.go index d6c6feae4..5d92a9f8e 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -17,19 +17,15 @@ package serve import ( - "fmt" "strconv" "sync" "time" "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/tracers" ethnode "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/jmoiron/sqlx" @@ -51,11 +47,7 @@ type Server interface { APIs() []rpc.API Protocols() []p2p.Protocol // Pub-Sub handling event loop - Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) - // Method to subscribe to the service - Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) - // Method to unsubscribe from the service - Unsubscribe(id rpc.ID) + Serve(wg *sync.WaitGroup) // Backend exposes the server's backend Backend() *eth.Backend } @@ -64,22 +56,10 @@ type Server interface { type Service struct { // Used to sync access to the Subscriptions sync.Mutex - // Interface for filtering and serving data according to subscribed clients according to their specification - Filterer eth.Filterer - // Interface for fetching IPLD objects from IPFS - IPLDFetcher eth.Fetcher - // Interface for searching and retrieving CIDs from Postgres index - Retriever eth.Retriever // Used to signal shutdown of the service QuitChan chan bool - // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) - Subscriptions map[common.Hash]map[rpc.ID]Subscription - // A mapping of subscription params hash to the corresponding subscription params - SubscriptionTypes map[common.Hash]eth.SubscriptionSettings // Underlying db db *sqlx.DB - // wg for syncing serve processes - serveWg *sync.WaitGroup // rpc client for forwarding cache misses client *rpc.Client // whether the proxied client supports state diffing @@ -101,13 +81,8 @@ type Service struct { // NewServer creates a new Server using an underlying Service struct func NewServer(settings *Config) (Server, error) { sap := new(Service) - sap.Retriever = eth.NewCIDRetriever(settings.DB) - sap.IPLDFetcher = eth.NewIPLDFetcher(settings.DB) - sap.Filterer = eth.NewResponseFilterer() sap.db = settings.DB sap.QuitChan = make(chan bool) - sap.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) - sap.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings) sap.client = settings.Client sap.supportsStateDiffing = settings.SupportStateDiff sap.stateDiffTimeout = settings.StateDiffTimeout @@ -177,200 +152,22 @@ func (sap *Service) APIs() []rpc.API { // It filters and sends this data to any subscribers to the service // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only -func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) { - sap.serveWg = wg +func (sap *Service) Serve(wg *sync.WaitGroup) { go func() { wg.Add(1) defer wg.Done() - for { - select { - case payload := <-screenAndServePayload: - sap.filterAndServe(payload) - case <-sap.QuitChan: - log.Info("quiting eth ipld server process") - return - } - } + <-sap.QuitChan + log.Info("quiting eth ipld server process") }() log.Info("eth ipld server process successfully spun up") } -// filterAndServe filters the payload according to each subscription type and sends to the subscriptions -func (sap *Service) filterAndServe(payload eth.ConvertedPayload) { - log.Debug("sending eth ipld payload to subscriptions") - sap.Lock() - sap.serveWg.Add(1) - defer sap.Unlock() - defer sap.serveWg.Done() - for ty, subs := range sap.Subscriptions { - // Retrieve the subscription parameters for this subscription type - subConfig, ok := sap.SubscriptionTypes[ty] - if !ok { - log.Errorf("eth ipld server subscription configuration for subscription type %s not available", ty.Hex()) - sap.closeType(ty) - continue - } - if subConfig.End.Int64() > 0 && subConfig.End.Int64() < payload.Block.Number().Int64() { - // We are not out of range for this subscription type - // close it, and continue to the next - sap.closeType(ty) - continue - } - response, err := sap.Filterer.Filter(subConfig, payload) - if err != nil { - log.Errorf("eth ipld server filtering error: %v", err) - sap.closeType(ty) - continue - } - responseRLP, err := rlp.EncodeToBytes(response) - if err != nil { - log.Errorf("eth ipld server rlp encoding error: %v", err) - continue - } - for id, sub := range subs { - select { - case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}: - log.Debugf("sending eth ipld server payload to subscription %s", id) - default: - log.Infof("unable to send eth ipld payload to subscription %s; channel has no receiver", id) - } - } - } -} - -// Subscribe is used by the API to remotely subscribe to the service loop -// The params must be rlp serializable and satisfy the SubscriptionSettings() interface -func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) { - sap.serveWg.Add(1) - defer sap.serveWg.Done() - log.Infof("new eth ipld subscription %s", id) - subscription := Subscription{ - ID: id, - PayloadChan: sub, - QuitChan: quitChan, - } - // Subscription type is defined as the hash of the rlp-serialized subscription settings - by, err := rlp.EncodeToBytes(params) - if err != nil { - sendNonBlockingErr(subscription, err) - sendNonBlockingQuit(subscription) - return - } - subscriptionType := crypto.Keccak256Hash(by) - if !params.BackFillOnly { - // Add subscriber - sap.Lock() - if sap.Subscriptions[subscriptionType] == nil { - sap.Subscriptions[subscriptionType] = make(map[rpc.ID]Subscription) - } - sap.Subscriptions[subscriptionType][id] = subscription - sap.SubscriptionTypes[subscriptionType] = params - sap.Unlock() - } - // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data - // Otherwise we only filter new data as it is streamed in from the state diffing geth node - if params.BackFill || params.BackFillOnly { - if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, fmt.Errorf("eth ipld server subscription backfill error: %v", err)) - sendNonBlockingQuit(subscription) - return - } - } -} - -// sendHistoricalData sends historical data to the requesting subscription -func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.SubscriptionSettings) error { - log.Infof("sending eth ipld historical data to subscription %s", id) - // Retrieve cached CIDs relevant to this subscriber - var endingBlock int64 - var startingBlock int64 - var err error - startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() - if err != nil { - return err - } - if startingBlock < params.Start.Int64() { - startingBlock = params.Start.Int64() - } - endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() - if err != nil { - return err - } - if endingBlock > params.End.Int64() && params.End.Int64() > 0 && params.End.Int64() > startingBlock { - endingBlock = params.End.Int64() - } - log.Debugf("eth ipld historical data starting block: %d", params.Start.Int64()) - log.Debugf("eth ipld historical data ending block: %d", endingBlock) - go func() { - sap.serveWg.Add(1) - defer sap.serveWg.Done() - for i := startingBlock; i <= endingBlock; i++ { - select { - case <-sap.QuitChan: - log.Infof("ethereum historical data feed to subscription %s closed", id) - return - default: - } - cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) - if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("eth ipld server cid retrieval error at block %d\r%s", i, err.Error())) - continue - } - if empty { - continue - } - for _, cids := range cidWrappers { - response, err := sap.IPLDFetcher.Fetch(cids) - if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("eth ipld server ipld fetching error at block %d\r%s", i, err.Error())) - continue - } - responseRLP, err := rlp.EncodeToBytes(response) - if err != nil { - log.Error(err) - continue - } - select { - case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}: - log.Debugf("eth ipld server sending historical data payload to subscription %s", id) - default: - log.Infof("eth ipld server unable to send backFill payload to subscription %s; channel has no receiver", id) - } - } - } - // when we are done backfilling send an empty payload signifying so in the msg - select { - case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: - log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id) - default: - log.Infof("eth ipld server unable to send backFill completion notice to subscription %s", id) - } - }() - return nil -} - -// Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop -func (sap *Service) Unsubscribe(id rpc.ID) { - log.Infof("unsubscribing %s from the eth ipld server", id) - sap.Lock() - for ty := range sap.Subscriptions { - delete(sap.Subscriptions[ty], id) - if len(sap.Subscriptions[ty]) == 0 { - // If we removed the last subscription of this type, remove the subscription type outright - delete(sap.Subscriptions, ty) - delete(sap.SubscriptionTypes, ty) - } - } - sap.Unlock() -} - // Start is used to begin the service // This is mostly just to satisfy the node.Service interface func (sap *Service) Start() error { log.Info("starting eth ipld server") wg := new(sync.WaitGroup) - payloadChan := make(chan eth.ConvertedPayload, PayloadChanBufferSize) - sap.Serve(wg, payloadChan) + sap.Serve(wg) return nil } @@ -380,7 +177,6 @@ func (sap *Service) Stop() error { log.Infof("stopping eth ipld server") sap.Lock() close(sap.QuitChan) - sap.close() sap.Unlock() return nil } @@ -389,28 +185,3 @@ func (sap *Service) Stop() error { func (sap *Service) Backend() *eth.Backend { return sap.backend } - -// close is used to close all listening subscriptions -// close needs to be called with subscription access locked -func (sap *Service) close() { - log.Infof("closing all eth ipld server subscriptions") - for subType, subs := range sap.Subscriptions { - for _, sub := range subs { - sendNonBlockingQuit(sub) - } - delete(sap.Subscriptions, subType) - delete(sap.SubscriptionTypes, subType) - } -} - -// closeType is used to close all subscriptions of given type -// closeType needs to be called with subscription access locked -func (sap *Service) closeType(subType common.Hash) { - log.Infof("closing all eth ipld server subscriptions of type %s", subType.String()) - subs := sap.Subscriptions[subType] - for _, sub := range subs { - sendNonBlockingQuit(sub) - } - delete(sap.Subscriptions, subType) - delete(sap.SubscriptionTypes, subType) -} diff --git a/pkg/serve/subscription.go b/pkg/serve/subscription.go deleted file mode 100644 index 413835904..000000000 --- a/pkg/serve/subscription.go +++ /dev/null @@ -1,60 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package serve - -import ( - "errors" - - "github.com/ethereum/go-ethereum/rpc" -) - -type Flag int32 - -const ( - EmptyFlag Flag = iota - BackFillCompleteFlag -) - -// Subscription holds the information for an individual client subscription to the watcher -type Subscription struct { - ID rpc.ID - PayloadChan chan<- SubscriptionPayload - QuitChan chan<- bool -} - -// SubscriptionPayload is the struct for a watcher data subscription payload -// It carries data of a type specific to the chain being supported/queried and an error message -type SubscriptionPayload struct { - Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload - Height int64 `json:"height"` - Err string `json:"err"` // field for error - Flag Flag `json:"flag"` // field for message -} - -func (sp SubscriptionPayload) Error() error { - if sp.Err == "" { - return nil - } - return errors.New(sp.Err) -} - -func (sp SubscriptionPayload) BackFillComplete() bool { - if sp.Flag == BackFillCompleteFlag { - return true - } - return false -}