Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: fix a breaking change and return rpctransaction #26757

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactive when deadline triggers
hashes []common.Hash
fullTx bool
txs []*types.Transaction
crit FilterCriteria
logs []*types.Log
Expand Down Expand Up @@ -103,14 +104,14 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
Expand Down Expand Up @@ -412,6 +413,9 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()

chainConfig := api.sys.backend.ChainConfig()
latest := api.sys.backend.CurrentHeader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be problematic. There might have been multiple blocks produced since the last time you retrieved the changes, in which case NewRPCPendingTransaction will use the latest head for calculating the basefee and whatnot fields vs the one that was actually present when those past blocks were produced.

The only correct place to initialize these fields would be when they are added to the list. Even then it's incorrect to pull the latest block at that point, rather it should be sent along with the transaction event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this, but I found that #26126 has ignored it. To elaborate, the basefee solely impacts mined transactions, we solely rely on the latest block number to derive the sender.

With this in mind, I suggest that we also ignore this problem here, and fix them together at a later time.


if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
Expand All @@ -426,9 +430,21 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
if f.fullTx {
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
for _, tx := range f.txs {
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
f.txs = nil
return txs, nil
} else {
hashes := make([]common.Hash, 0, len(f.txs))
for _, tx := range f.txs {
hashes = append(hashes, tx.Hash())
}
f.txs = nil
return hashes, nil
}
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
Expand Down
79 changes: 69 additions & 10 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
Expand All @@ -52,11 +53,12 @@ type testBackend struct {
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
panic("implement me")
return params.TestChainConfig
}

func (b *testBackend) CurrentHeader() *types.Header {
panic("implement me")
hdr, _ := b.HeaderByNumber(context.TODO(), rpc.LatestBlockNumber)
return hdr
}

func (b *testBackend) ChainDb() ethdb.Database {
Expand Down Expand Up @@ -256,10 +258,10 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*types.Transaction
hashes []common.Hash
)

fid0 := api.NewPendingTransactionFilter()
fid0 := api.NewPendingTransactionFilter(nil)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand All @@ -271,7 +273,64 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*types.Transaction)
h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
break
}
// check timeout
if time.Now().After(timeout) {
break
}

time.Sleep(100 * time.Millisecond)
}

if len(hashes) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
return
}
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
}
}
}

// TestPendingTxFilterFullTx tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilterFullTx(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)

transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*ethapi.RPCTransaction
)

fullTx := true
fid0 := api.NewPendingTransactionFilter(&fullTx)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})

timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*ethapi.RPCTransaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
break
Expand All @@ -289,8 +348,8 @@ func TestPendingTxFilter(t *testing.T) {
return
}
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
if txs[i].Hash != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash)
}
}
}
Expand Down Expand Up @@ -854,15 +913,15 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
// timeout either in 100ms or 200ms
fids := make([]rpc.ID, 20)
for i := 0; i < len(fids); i++ {
fid := api.NewPendingTransactionFilter()
fid := api.NewPendingTransactionFilter(nil)
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
txs, err := api.GetFilterChanges(fid)
hashes, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(txs.([]*types.Transaction)) > 0 {
if len(hashes.([]common.Hash)) > 0 {
break
}
runtime.Gosched()
Expand Down