Skip to content

Commit

Permalink
eth/filters: fix a breaking change and return rpctransaction (ethereu…
Browse files Browse the repository at this point in the history
…m#26757)

* eth/filters: fix a breaking change and return rpctransaction

* eth/filters: fix test cases

---------

Co-authored-by: Catror <me@catror.com>
  • Loading branch information
yierx and catror authored Feb 22, 2023
1 parent fe01a2f commit 4034c67
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
26 changes: 21 additions & 5 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactive when deadline triggers
hashes []common.Hash
fullTx bool
txs []*types.Transaction
crit FilterCriteria
logs []*types.Log
Expand Down Expand Up @@ -103,14 +104,14 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
Expand Down Expand Up @@ -412,6 +413,9 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()

chainConfig := api.sys.backend.ChainConfig()
latest := api.sys.backend.CurrentHeader()

if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
Expand All @@ -426,9 +430,21 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
if f.fullTx {
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
for _, tx := range f.txs {
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
f.txs = nil
return txs, nil
} else {
hashes := make([]common.Hash, 0, len(f.txs))
for _, tx := range f.txs {
hashes = append(hashes, tx.Hash())
}
f.txs = nil
return hashes, nil
}
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
Expand Down
79 changes: 69 additions & 10 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
Expand All @@ -52,11 +53,12 @@ type testBackend struct {
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
panic("implement me")
return params.TestChainConfig
}

func (b *testBackend) CurrentHeader() *types.Header {
panic("implement me")
hdr, _ := b.HeaderByNumber(context.TODO(), rpc.LatestBlockNumber)
return hdr
}

func (b *testBackend) ChainDb() ethdb.Database {
Expand Down Expand Up @@ -256,10 +258,10 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*types.Transaction
hashes []common.Hash
)

fid0 := api.NewPendingTransactionFilter()
fid0 := api.NewPendingTransactionFilter(nil)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand All @@ -271,7 +273,64 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*types.Transaction)
h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
break
}
// check timeout
if time.Now().After(timeout) {
break
}

time.Sleep(100 * time.Millisecond)
}

if len(hashes) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
return
}
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
}
}
}

// TestPendingTxFilterFullTx tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilterFullTx(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)

transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*ethapi.RPCTransaction
)

fullTx := true
fid0 := api.NewPendingTransactionFilter(&fullTx)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})

timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*ethapi.RPCTransaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
break
Expand All @@ -289,8 +348,8 @@ func TestPendingTxFilter(t *testing.T) {
return
}
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
if txs[i].Hash != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash)
}
}
}
Expand Down Expand Up @@ -854,15 +913,15 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
// timeout either in 100ms or 200ms
fids := make([]rpc.ID, 20)
for i := 0; i < len(fids); i++ {
fid := api.NewPendingTransactionFilter()
fid := api.NewPendingTransactionFilter(nil)
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
txs, err := api.GetFilterChanges(fid)
hashes, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(txs.([]*types.Transaction)) > 0 {
if len(hashes.([]common.Hash)) > 0 {
break
}
runtime.Gosched()
Expand Down

0 comments on commit 4034c67

Please sign in to comment.