Skip to content

Commit

Permalink
Merge pull request ethereum#118 from mdehoog/disable-pending-txpool-a…
Browse files Browse the repository at this point in the history
…ccess

Disable access to pending txs from filters + subscriptions
  • Loading branch information
trianglesphere authored Aug 22, 2023
2 parents bb04c5f + d2110a0 commit 88c9fa3
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var (
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupAllowPendingTxFilters,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

Expand Down
9 changes: 8 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,11 @@ var (
Usage: "By default the pending block equals the latest block to save resources and not leak txs from the tx-pool, this flag enables computing of the pending block from the tx-pool instead.",
Category: flags.RollupCategory,
}
RollupAllowPendingTxFilters = &cli.BoolFlag{
Name: "rollup.allowpendingtxfilters",
Usage: "By default 'eth_subscribe' with 'NewPendingTransaction' and 'eth_newPendingTransactionFilter' are disabled to prevent leaking txs from the tx-pool.",
Category: flags.RollupCategory,
}

// Metrics flags
MetricsEnabledFlag = &cli.BoolFlag{
Expand Down Expand Up @@ -1833,6 +1838,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.RollupHistoricalRPCTimeout = ctx.Duration(RollupHistoricalRPCTimeoutFlag.Name)
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupAllowPendingTxFilters = ctx.Bool(RollupAllowPendingTxFilters.Name)
// Override any default configs for hard coded networks.
switch {
case ctx.Bool(MainnetFlag.Name):
Expand Down Expand Up @@ -2025,7 +2031,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
LogCacheSize: ethcfg.FilterLogCacheSize,
AllowPendingTxs: ethcfg.RollupAllowPendingTxFilters,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Expand Down
9 changes: 5 additions & 4 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ type Config struct {
OverrideOptimismRegolith *uint64 `toml:",omitempty"`
OverrideOptimism *bool

RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupAllowPendingTxFilters bool
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
14 changes: 12 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
// errPendingDisabled is returned from NewPendingTransaction* when access to the mempool is not allowed
errPendingDisabled = errors.New("pending tx filters are disabled")
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -109,7 +111,11 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) {
if !api.sys.cfg.AllowPendingTxs {
return "", errPendingDisabled
}

var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
Expand Down Expand Up @@ -137,13 +143,17 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
}
}()

return pendingTxSub.ID
return pendingTxSub.ID, nil
}

// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
if !api.sys.cfg.AllowPendingTxs {
return &rpc.Subscription{}, errPendingDisabled
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down
2 changes: 2 additions & 0 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
// allow filtering or subscriptions to new pending txs:
AllowPendingTxs bool
}

func (cfg Config) withDefaults() Config {
Expand Down
16 changes: 13 additions & 3 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc

func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db}
cfg.AllowPendingTxs = true
sys := NewFilterSystem(backend, cfg)
return backend, sys
}
Expand Down Expand Up @@ -263,7 +264,10 @@ func TestPendingTxFilter(t *testing.T) {
hashes []common.Hash
)

fid0 := api.NewPendingTransactionFilter(nil)
fid0, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -320,7 +324,10 @@ func TestPendingTxFilterFullTx(t *testing.T) {
)

fullTx := true
fid0 := api.NewPendingTransactionFilter(&fullTx)
fid0, err := api.NewPendingTransactionFilter(&fullTx)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -915,7 +922,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
// timeout either in 100ms or 200ms
fids := make([]rpc.ID, 20)
for i := 0; i < len(fids); i++ {
fid := api.NewPendingTransactionFilter(nil)
fid, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
Expand Down

0 comments on commit 88c9fa3

Please sign in to comment.