From 82b0dec7135b281c1b03064d50959dc992c2f94f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 22 Apr 2024 10:31:17 +0200 Subject: [PATCH] eth/filters: remove support for pending logs (#29574) This change removes support for subscribing to pending logs. "Pending logs" were always an odd feature, because it can never be fully reliable. When support for it was added many years ago, the intention was for this to be used by wallet apps to show the 'potential future token balance' of accounts, i.e. as a way of notifying the user of incoming transfers before they were mined. In order to generate the pending logs, the node must pick a subset of all public mempool transactions, execute them in the EVM, and then dispatch the resulting logs to API consumers. --- cmd/utils/flags.go | 2 +- eth/filters/api.go | 15 +- eth/filters/filter.go | 43 +-- eth/filters/filter_system.go | 195 +----------- eth/filters/filter_system_test.go | 387 +----------------------- eth/filters/filter_test.go | 12 +- ethclient/gethclient/gethclient_test.go | 2 +- ethclient/simulated/backend.go | 2 +- 8 files changed, 44 insertions(+), 614 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 05bf649d38b6..8ef9d9e7e836 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1959,7 +1959,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) return filterSystem } diff --git a/eth/filters/api.go b/eth/filters/api.go index 56a9de1b215f..23fb1faca896 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -34,10 +34,11 @@ import ( ) var ( - errInvalidTopic = errors.New("invalid topic(s)") - errFilterNotFound = errors.New("filter not found") - errInvalidBlockRange = errors.New("invalid block range params") - errExceedMaxTopics = errors.New("exceed max topics") + errInvalidTopic = errors.New("invalid topic(s)") + errFilterNotFound = errors.New("filter not found") + errInvalidBlockRange = errors.New("invalid block range params") + errPendingLogsUnsupported = errors.New("pending logs are not supported") + errExceedMaxTopics = errors.New("exceed max topics") ) // The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 @@ -70,10 +71,10 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { +func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ sys: system, - events: NewEventSystem(system, lightMode), + events: NewEventSystem(system), filters: make(map[rpc.ID]*filter), timeout: system.cfg.Timeout, } @@ -456,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.txs = nil return hashes, nil } - case LogsSubscription, MinedAndPendingLogsSubscription: + case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 2f59026b73ed..09ccb939073a 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -108,19 +108,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.blockLogs(ctx, header) } - var ( - beginPending = f.begin == rpc.PendingBlockNumber.Int64() - endPending = f.end == rpc.PendingBlockNumber.Int64() - ) - - // special case for pending logs - if beginPending && !endPending { - return nil, errInvalidBlockRange - } - - // Short-cut if all we care about is pending logs - if beginPending && endPending { - return f.pendingLogs(), nil + // Disallow pending logs. + if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() { + return nil, errPendingLogsUnsupported } resolveSpecial := func(number int64) (int64, error) { @@ -165,16 +155,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { case log := <-logChan: logs = append(logs, log) case err := <-errChan: - if err != nil { - // if an error occurs during extraction, we do return the extracted data - return logs, err - } - // Append the pending ones - if endPending { - pendingLogs := f.pendingLogs() - logs = append(logs, pendingLogs...) - } - return logs, nil + return logs, err } } } @@ -332,22 +313,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ return logs, nil } -// pendingLogs returns the logs matching the filter criteria within the pending block. -func (f *Filter) pendingLogs() []*types.Log { - block, receipts, _ := f.sys.backend.Pending() - if block == nil || receipts == nil { - return nil - } - if bloomFilter(block.Bloom(), f.addresses, f.topics) { - var unfiltered []*types.Log - for _, r := range receipts { - unfiltered = append(unfiltered, r.Logs...) - } - return filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - } - return nil -} - // filterLogs creates a slice of logs matching the given criteria. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { var check = func(log *types.Log) bool { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index d8b41a425973..a3a2787a4144 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,8 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -63,7 +61,6 @@ type Backend interface { GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - Pending() (*types.Block, types.Receipts, *state.StateDB) CurrentHeader() *types.Header ChainConfig() *params.ChainConfig @@ -152,10 +149,6 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs in pending blocks - PendingLogsSubscription - // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. - MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries for pending transactions entering // the pending state PendingTransactionsSubscription @@ -192,10 +185,8 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - sys *FilterSystem - lightMode bool - lastHead *types.Header + backend Backend + sys *FilterSystem // Subscriptions txsSub event.Subscription // Subscription for new transaction event @@ -218,11 +209,10 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem) *EventSystem { m := &EventSystem{ sys: sys, backend: sys.backend, - lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), txsCh: make(chan core.NewTxsEvent, txChanSize), @@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ to = rpc.BlockNumber(crit.ToBlock.Int64()) } - // only interested in pending logs - if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribePendingLogs(crit, logs), nil + // Pending logs are not supported anymore. + if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber { + return nil, errPendingLogsUnsupported } + // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -322,10 +313,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ if from >= 0 && to >= 0 && to >= from { return es.subscribeLogs(crit, logs), nil } - // interested in mined logs from a specific block number, new logs and pending logs - if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribeMinedPendingLogs(crit, logs), nil - } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -333,23 +320,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return nil, errInvalidBlockRange } -// subscribeMinedPendingLogs creates a subscription that returned mined and -// pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { @@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return es.subscribe(sub) } -// subscribePendingLogs creates a subscription that writes contract event logs for -// transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { @@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { } } -func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) { - if len(logs) == 0 { - return - } - for _, f := range filters[PendingLogsSubscription] { - matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) - if len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } -} - func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { for _, f := range filters[PendingTransactionsSubscription] { f.txs <- ev.Txs @@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 { - continue - } - if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 { - continue - } - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } -} - -func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { - oldh := es.lastHead - es.lastHead = newHeader - if oldh == nil { - return - } - newh := newHeader - // find common ancestor, create list of rolled back and new block hashes - var oldHeaders, newHeaders []*types.Header - for oldh.Hash() != newh.Hash() { - if oldh.Number.Uint64() >= newh.Number.Uint64() { - oldHeaders = append(oldHeaders, oldh) - oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) - } - if oldh.Number.Uint64() < newh.Number.Uint64() { - newHeaders = append(newHeaders, newh) - newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) - if newh == nil { - // happens when CHT syncing, nothing to do - newh = oldh - } - } - } - // roll back old blocks - for _, h := range oldHeaders { - callBack(h, true) - } - // check new blocks (array is in reverse order) - for i := len(newHeaders) - 1; i >= 0; i-- { - callBack(newHeaders[i], false) - } -} - -// filter logs of a single header in light client mode -func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { - if !bloomFilter(header.Bloom, addresses, topics) { - return nil - } - // Get the logs of the block - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - unfiltered := append([]*types.Log{}, cached.logs...) - for i, log := range unfiltered { - // Don't modify in-cache elements - logcopy := *log - logcopy.Removed = remove - // Swap copy in-place - unfiltered[i] = &logcopy - } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) - // Txhash is already resolved - if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { - return logs - } - // Resolve txhash - body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - for _, log := range logs { - // logs are already copied, safe to modify - log.TxHash = body.Transactions[log.TxIndex].Hash() - } - return logs } // eventLoop (un)installs filters and processes mux events. @@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() { es.handleLogs(index, ev.Logs) case ev := <-es.chainCh: es.handleChainEvent(index, ev) - // If we have no pending log subscription, - // we don't need to collect any pending logs. - if len(index[PendingLogsSubscription]) == 0 { - continue - } - - // Pull the pending logs if there is a new chain head. - pendingBlock, pendingReceipts, _ := es.backend.Pending() - if pendingBlock == nil || pendingReceipts == nil { - continue - } - if pendingBlock.ParentHash() != ev.Block.Hash() { - continue - } - var logs []*types.Log - for _, receipt := range pendingReceipts { - if len(receipt.Logs) > 0 { - logs = append(logs, receipt.Logs...) - } - } - es.handlePendingLogs(index, logs) case f := <-es.install: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - index[LogsSubscription][f.id] = f - index[PendingLogsSubscription][f.id] = f - } else { - index[f.typ][f.id] = f - } + index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - delete(index[LogsSubscription], f.id) - delete(index[PendingLogsSubscription], f.id) - } else { - delete(index[f.typ], f.id) - } + delete(index[f.typ], f.id) close(f.err) // System stopped diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 4a0f40cce934..013b9f7bc26c 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -19,7 +19,6 @@ package filters import ( "context" "errors" - "fmt" "math/big" "math/rand" "reflect" @@ -27,15 +26,12 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -125,10 +121,6 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint return logs, nil } -func (b *testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { - return b.pendingBlock, b.pendingReceipts, nil -} - func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -181,15 +173,6 @@ func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { b.pendingReceipts = receipts } -func (b *testBackend) notifyPending(logs []*types.Log) { - genesis := &core.Genesis{ - Config: params.TestChainConfig, - } - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, b *core.BlockGen) {}) - b.setPending(blocks[1], []*types.Receipt{{Logs: logs}}) - b.chainFeed.Send(core.ChainEvent{Block: blocks[0]}) -} - func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} sys := NewFilterSystem(backend, cfg) @@ -207,7 +190,7 @@ func TestBlockSubscription(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) genesis = &core.Genesis{ Config: params.TestChainConfig, BaseFee: big.NewInt(params.InitialBaseFee), @@ -262,7 +245,7 @@ func TestPendingTxFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -318,7 +301,7 @@ func TestPendingTxFilterFullTx(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -374,7 +357,7 @@ func TestLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) testCases = []struct { crit FilterCriteria @@ -386,8 +369,6 @@ func TestLogFilterCreation(t *testing.T) { {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, // "mined" block range to pending {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, - // new mined and pending blocks - {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, // from block "higher" than to block {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, // from block "higher" than to block @@ -423,7 +404,7 @@ func TestInvalidLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) // different situations where log filter creation should fail. @@ -449,7 +430,7 @@ func TestInvalidGetLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -475,7 +456,7 @@ func TestInvalidGetRangeLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) if _, err := api.GetLogs(context.Background(), FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}); err != errInvalidBlockRange { @@ -490,7 +471,7 @@ func TestLogFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -509,9 +490,6 @@ func TestLogFilter(t *testing.T) { {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}, } - expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} - expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} - testCases = []struct { crit FilterCriteria expected []*types.Log @@ -529,20 +507,14 @@ func TestLogFilter(t *testing.T) { 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // logs in the pending block - 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, - // mined logs with block num >= 2 or pending logs - 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, // all "mined" logs with block num >= 2 - 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, // all "mined" logs - 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + 7: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, // all "mined" logs with 1>= block num <=2 and topic secondTopic - 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - // all "mined" and pending logs with topic firstTopic - 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + 8: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // match all logs due to wildcard topic - 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, + 9: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -557,16 +529,13 @@ func TestLogFilter(t *testing.T) { t.Fatal("Logs event not delivered") } - // set pending logs - backend.notifyPending(allLogs) - for i, tt := range testCases { var fetched []*types.Log timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) + t.Fatalf("test %d: unable to fetch logs: %v", i, err) } fetched = append(fetched, results.([]*types.Log)...) @@ -597,326 +566,6 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. -func TestPendingLogsSubscription(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") - notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - - allLogs = [][]*types.Log{ - {{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}, - {{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}, - {{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}, - { - {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, - {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - }, - } - - pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64()) - - testCases = []struct { - crit ethereum.FilterQuery - expected []*types.Log - c chan []*types.Log - sub *Subscription - err chan error - }{ - // match all - { - ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to no matching addresses - { - ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses, ignore topics - { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, nil, - }, - // match none due to no matching topics (match with address) - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses and topics - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[3:5]), allLogs[5][0]), - nil, nil, nil, - }, - // match logs based on multiple addresses and "or" topics - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[2:5]), allLogs[5][0]), - nil, nil, nil, - }, - // multiple pending logs, should match only 2 topics from the logs in block 5 - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - []*types.Log{allLogs[5][0], allLogs[5][2]}, - nil, nil, nil, - }, - // match none due to only matching new mined logs - { - ethereum.FilterQuery{}, - nil, - nil, nil, nil, - }, - // match none due to only matching mined logs within a specific block range - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, - nil, - nil, nil, nil, - }, - // match all due to matching mined and pending logs - { - ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to matching logs from a specific block number to new mined blocks - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, - nil, - nil, nil, nil, - }, - } - ) - - // create all subscriptions, this ensures all subscriptions are created before the events are posted. - // on slow machines this could otherwise lead to missing events when the subscription is created after - // (some) events are posted. - for i := range testCases { - testCases[i].c = make(chan []*types.Log) - testCases[i].err = make(chan error, 1) - - var err error - testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) - if err != nil { - t.Fatalf("SubscribeLogs %d failed: %v\n", i, err) - } - } - - for n, test := range testCases { - i := n - tt := test - go func() { - defer tt.sub.Unsubscribe() - - var fetched []*types.Log - - timeout := time.After(1 * time.Second) - fetchLoop: - for { - select { - case logs := <-tt.c: - // Do not break early if we've fetched greater, or equal, - // to the number of logs expected. This ensures we do not - // deadlock the filter system because it will do a blocking - // send on this channel if another log arrives. - fetched = append(fetched, logs...) - case <-timeout: - break fetchLoop - } - } - - if len(fetched) != len(tt.expected) { - tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i) - return - } - if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i) - return - } - } - tt.err <- nil - }() - } - - // set pending logs - var flattenLogs []*types.Log - for _, logs := range allLogs { - flattenLogs = append(flattenLogs, logs...) - } - backend.notifyPending(flattenLogs) - - for i := range testCases { - err := <-testCases[i].err - if err != nil { - t.Fatalf("test %d failed: %v", i, err) - } - <-testCases[i].sub.Err() - } -} - -func TestLightFilterLogs(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, true) - signer = types.HomesteadSigner{} - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - - // posted twice, once as regular logs and once as pending logs. - allLogs = []*types.Log{ - // Block 1 - {Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0}, - // Block 2 - {Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0}, - {Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1}, - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2}, - // Block 3 - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0}, - } - - testCases = []struct { - crit FilterCriteria - expected []*types.Log - id rpc.ID - }{ - // match all - 0: {FilterCriteria{}, allLogs, ""}, - // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, - // match logs based on addresses, ignore topics - 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, - // match logs based on addresses and topics - 3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, - // all logs with block num >= 3 - 4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""}, - // all logs - 5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""}, - // all logs with 1>= block num <=2 and topic secondTopic - 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - } - - key, _ = crypto.GenerateKey() - addr = crypto.PubkeyToAddress(key.PublicKey) - genesis = &core.Genesis{Config: params.TestChainConfig, - Alloc: types.GenesisAlloc{ - addr: {Balance: big.NewInt(params.Ether)}, - }, - } - receipts = []*types.Receipt{{ - Logs: []*types.Log{allLogs[0]}, - }, { - Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]}, - }, { - Logs: []*types.Log{allLogs[4]}, - }} - ) - - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) { - if i == 0 { - return - } - receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]}) - b.AddUncheckedReceipt(receipts[i-1]) - tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: b.BaseFee(), Data: nil}), signer, key) - b.AddTx(tx) - }) - for i, block := range blocks { - rawdb.WriteBlock(db, block) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - if i > 0 { - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]}) - } - } - // create all filters - for i := range testCases { - id, err := api.NewFilter(testCases[i].crit) - if err != nil { - t.Fatal(err) - } - testCases[i].id = id - } - - // raise events - time.Sleep(1 * time.Second) - for _, block := range blocks { - backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs}) - } - - for i, tt := range testCases { - var fetched []*types.Log - timeout := time.Now().Add(1 * time.Second) - for { // fetch all expected logs - results, err := api.GetFilterChanges(tt.id) - if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) - } - fetched = append(fetched, results.([]*types.Log)...) - if len(fetched) >= len(tt.expected) { - break - } - // check timeout - if time.Now().After(timeout) { - break - } - - time.Sleep(100 * time.Millisecond) - } - - if len(fetched) != len(tt.expected) { - t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) - } - expected := *tt.expected[l] - blockNum := expected.BlockNumber - 1 - expected.BlockHash = blocks[blockNum].Hash() - expected.TxHash = blocks[blockNum].Transactions()[0].Hash() - if !reflect.DeepEqual(fetched[l], &expected) { - t.Errorf("invalid log on index %d for case %d", l, i) - } - } - } -} - // TestPendingTxFilterDeadlock tests if the event loop hangs when pending // txes arrive at the same time that one of multiple filters is timing out. // Please refer to #22131 for more details. @@ -927,7 +576,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) done = make(chan struct{}) ) @@ -979,11 +628,3 @@ func TestPendingTxFilterDeadlock(t *testing.T) { } } } - -func flattenLogs(pl [][]*types.Log) []*types.Log { - var logs []*types.Log - for _, l := range pl { - logs = append(logs, l...) - } - return logs -} diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 48aaa584dbb1..2b3efb51b17f 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -344,16 +344,16 @@ func TestFilters(t *testing.T) { err: "safe header not found", }, { - f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + err: errPendingLogsUnsupported.Error(), }, { - f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + err: errPendingLogsUnsupported.Error(), }, { f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), - err: errInvalidBlockRange.Error(), + err: errPendingLogsUnsupported.Error(), }, } { logs, err := tc.f.Logs(context.Background()) @@ -375,7 +375,7 @@ func TestFilters(t *testing.T) { } t.Run("timeout", func(t *testing.T) { - f := sys.NewRangeFilter(0, -1, nil, nil) + f := sys.NewRangeFilter(0, rpc.LatestBlockNumber.Int64(), nil, nil) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) defer cancel() _, err := f.Logs(ctx) diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index 1cd9c5389b89..59ad370146bb 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -65,7 +65,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{}) n.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) // Import the test chain. diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index d6fb886ad891..6e07aa68d071 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -114,7 +114,7 @@ func newWithNode(stack *node.Node, conf *eth.Config, blockPeriod uint64) (*Backe filterSystem := filters.NewFilterSystem(backend.APIBackend, filters.Config{}) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) // Start the node if err := stack.Start(); err != nil {