diff --git a/eth/filters/api.go b/eth/filters/api.go index 31de091838..a7a1c16d9b 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -39,7 +39,6 @@ type filter struct { typ Type deadline *time.Timer // filter is inactiv when deadline triggers hashes []common.Hash - txs []*types.Transaction crit FilterCriteria logs []*types.Log s *Subscription // associated subscription in event system @@ -100,7 +99,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) { } } -// NewPendingTransactionFilter creates a filter that fetches pending transactions +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // // It is part of the filter package because this filter can be used through the @@ -109,20 +108,20 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) { // https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan []*types.Transaction) + pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() gopool.Submit(func() { for { select { - case pTx := <-pendingTxs: + case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.txs = append(f.txs, pTx...) + f.hashes = append(f.hashes, ph...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -137,10 +136,9 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { return pendingTxSub.ID } -// NewPendingTransactions creates a subscription that is triggered each time a -// transaction enters the transaction pool. If fullTx is true the full tx is -// sent to the client, otherwise the hash is sent. -func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -149,20 +147,16 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx * rpcSub := notifier.CreateSubscription() gopool.Submit(func() { - txs := make(chan []*types.Transaction, 128) - pendingTxSub := api.events.SubscribePendingTxs(txs) + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) for { select { - case txs := <-txs: + case hashes := <-txHashes: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification - for _, tx := range txs { - if fullTx != nil && *fullTx { - notifier.Notify(rpcSub.ID, tx) - } else { - notifier.Notify(rpcSub.ID, tx.Hash()) - } + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() @@ -557,14 +551,10 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(api.timeout) switch f.typ { - case BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription: + case PendingTransactionsSubscription, BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil - case PendingTransactionsSubscription: - txs := f.txs - f.txs = nil - return txs, nil case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 7186e7f7d7..bf461a1c5c 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -47,8 +47,8 @@ const ( PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription - // PendingTransactionsSubscription queries for pending transactions entering - // the pending state + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription @@ -83,7 +83,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - txs chan []*types.Transaction + hashes chan []common.Hash headers chan *types.Header finalizedHeaders chan *types.Header votes chan *types.VoteEnvelope @@ -187,7 +187,7 @@ func (sub *Subscription) Unsubscribe() { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: - case <-sub.f.txs: + case <-sub.f.hashes: case <-sub.f.headers: case <-sub.f.votes: } @@ -255,7 +255,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -273,7 +273,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -291,7 +291,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -308,7 +308,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: headers, votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -325,7 +325,7 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header) typ: FinalizedHeadersSubscription, created: time.Now(), logs: make(chan []*types.Log), - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: headers, votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -334,15 +334,15 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header) return es.subscribe(sub) } -// SubscribePendingTxs creates a subscription that writes transactions for +// SubscribePendingTxs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription { +func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*types.Log), - txs: txs, + hashes: hashes, headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -359,7 +359,7 @@ func (es *EventSystem) SubscribeNewVotes(votes chan *types.VoteEnvelope) *Subscr typ: VotesSubscription, created: time.Now(), logs: make(chan []*types.Log), - txs: make(chan []*types.Transaction), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), votes: votes, installed: make(chan struct{}), @@ -404,8 +404,12 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + hashes := make([]common.Hash, 0, len(ev.Txs)) + for _, tx := range ev.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.txs <- ev.Txs + f.hashes <- hashes } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 3acc0b6da9..737eaa17c7 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -250,7 +250,7 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - txs []*types.Transaction + hashes []common.Hash ) fid0 := api.NewPendingTransactionFilter() @@ -265,9 +265,9 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - tx := results.([]*types.Transaction) - txs = append(txs, tx...) - if len(txs) >= len(transactions) { + h := results.([]common.Hash) + hashes = append(hashes, h...) + if len(hashes) >= len(transactions) { break } // check timeout @@ -278,13 +278,13 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(100 * time.Millisecond) } - if len(txs) != len(transactions) { - t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs)) + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) return } - for i := range txs { - if txs[i].Hash() != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) + for i := range hashes { + if hashes[i] != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) } } } @@ -715,11 +715,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) { fids[i] = fid // Wait for at least one tx to arrive in filter for { - txs, err := api.GetFilterChanges(fid) + hashes, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(txs.([]*types.Transaction)) > 0 { + if len(hashes.([]common.Hash)) > 0 { break } runtime.Gosched() diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index 72dc14d7c1..3795b182e1 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -175,12 +175,7 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { return &result, err } -// SubscribeFullPendingTransactions subscribes to new pending transactions. -func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { - return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) -} - -// SubscribePendingTransactions subscribes to new pending transaction hashes. +// SubscribePendingTransactions subscribes to new pending transactions. func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index 1fb2683e30..d8f9385690 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -122,12 +122,9 @@ func TestGethClient(t *testing.T) { }, { "TestSetHead", func(t *testing.T) { testSetHead(t, client) }, - }, { - "TestSubscribePendingTxHashes", - func(t *testing.T) { testSubscribePendingTransactions(t, client) }, }, { "TestSubscribePendingTxs", - func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) }, + func(t *testing.T) { testSubscribePendingTransactions(t, client) }, }, { "TestCallContract", func(t *testing.T) { testCallContract(t, client) }, @@ -301,40 +298,6 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { } } -func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) { - ec := New(client) - ethcl := ethclient.NewClient(client) - // Subscribe to Transactions - ch := make(chan *types.Transaction) - ec.SubscribeFullPendingTransactions(context.Background(), ch) - // Send a transaction - chainID, err := ethcl.ChainID(context.Background()) - if err != nil { - t.Fatal(err) - } - // Create transaction - tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil) - signer := types.LatestSignerForChainID(chainID) - signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey) - if err != nil { - t.Fatal(err) - } - signedTx, err := tx.WithSignature(signer, signature) - if err != nil { - t.Fatal(err) - } - // Send transaction - err = ethcl.SendTransaction(context.Background(), signedTx) - if err != nil { - t.Fatal(err) - } - // Check that the transaction was send over the channel - tx = <-ch - if tx.Hash() != signedTx.Hash() { - t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash()) - } -} - func testCallContract(t *testing.T, client *rpc.Client) { ec := New(client) msg := ethereum.CallMsg{