Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: send rpctransactions in pending-subscription #26126

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,14 @@ func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.Matche
panic("not supported")
}

func (fb *filterBackend) ChainConfig() *params.ChainConfig {
panic("not supported")
}

func (fb *filterBackend) CurrentHeader() *types.Header {
panic("not supported")
}

func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
Expand Down
6 changes: 5 additions & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/rpc"
)

Expand Down Expand Up @@ -147,15 +148,18 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
go func() {
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)
chainConfig := api.sys.backend.ChainConfig()

for {
select {
case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
latest := api.sys.backend.CurrentHeader()
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
rpcTx := ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)
notifier.Notify(rpcSub.ID, rpcTx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
Expand Down
3 changes: 3 additions & 0 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand Down Expand Up @@ -61,6 +62,8 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)

CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
Expand Down
8 changes: 8 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ type testBackend struct {
chainFeed event.Feed
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
panic("implement me")
}

func (b *testBackend) CurrentHeader() *types.Header {
panic("implement me")
}

func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}
Expand Down
16 changes: 8 additions & 8 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ func (s *TxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
for account, txs := range pending {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["queued"][account.Hex()] = dump
}
Expand All @@ -195,14 +195,14 @@ func (s *TxPoolAPI) ContentFrom(addr common.Address) map[string]map[string]*RPCT
// Build the pending transactions
dump := make(map[string]*RPCTransaction, len(pending))
for _, tx := range pending {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"] = dump

// Build the queued transactions
dump = make(map[string]*RPCTransaction, len(queue))
for _, tx := range queue {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["queued"] = dump

Expand Down Expand Up @@ -1344,8 +1344,8 @@ func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber
return result
}

// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func newRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction {
// NewRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction {
var baseFee *big.Int
blockNumber := uint64(0)
if current != nil {
Expand Down Expand Up @@ -1577,7 +1577,7 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H
}
// No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
return newRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
}

// Transaction unknown, return as such
Expand Down Expand Up @@ -1847,7 +1847,7 @@ func (s *TransactionAPI) PendingTransactions() ([]*RPCTransaction, error) {
for _, tx := range pending {
from, _ := types.Sender(s.signer, tx)
if _, exists := accounts[from]; exists {
transactions = append(transactions, newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()))
transactions = append(transactions, NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()))
}
}
return transactions, nil
Expand Down
10 changes: 8 additions & 2 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -87,9 +87,15 @@ type Backend interface {
ChainConfig() *params.ChainConfig
Engine() consensus.Engine

// This is copied from filters.Backend
// eth/filters needs to be initialized from this backend type, so methods needed by
// it must also be included here.
filters.Backend
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}

func GetAPIs(apiBackend Backend) []rpc.API {
Expand Down