From 90fd01423ac2b7e61463e673df4732832fb7d435 Mon Sep 17 00:00:00 2001 From: KeefeL <90749943+KeefeL@users.noreply.github.com> Date: Thu, 25 Nov 2021 11:00:14 +0800 Subject: [PATCH] [R4R]reannounce local pending transactions (#570) * reannouce local pending transactions * add tests for tx_pool reannouce local pending transactions * add tests for handler reannounce local pending transactions --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 8 +++++ core/events.go | 3 ++ core/tx_pool.go | 72 ++++++++++++++++++++++++++++++++------- core/tx_pool_test.go | 41 ++++++++++++++++++++++ core/types/transaction.go | 5 +++ eth/handler.go | 51 +++++++++++++++++++++++++++ eth/handler_eth_test.go | 53 ++++++++++++++++++++++++++++ eth/handler_test.go | 23 +++++++++++-- eth/peerset.go | 16 +++++++++ 11 files changed, 259 insertions(+), 15 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 8be8d20bf4..e896c7d659 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -90,6 +90,7 @@ var ( utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolReannounceTimeFlag, utils.SyncModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 2a208c827b..fba14530b5 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolReannounceTimeFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 49ea0d1de1..67632c031b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -398,6 +398,11 @@ var ( Usage: "Maximum amount of time non-executable transaction are queued", Value: ethconfig.Defaults.TxPool.Lifetime, } + TxPoolReannounceTimeFlag = cli.DurationFlag{ + Name: "txpool.reannouncetime", + Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)", + Value: ethconfig.Defaults.TxPool.ReannounceTime, + } // Performance tuning settings CacheFlag = cli.IntFlag{ Name: "cache", @@ -1410,6 +1415,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } + if ctx.GlobalIsSet(TxPoolReannounceTimeFlag.Name) { + cfg.ReannounceTime = ctx.GlobalDuration(TxPoolReannounceTimeFlag.Name) + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/events.go b/core/events.go index ac935a137f..5e730a24a7 100644 --- a/core/events.go +++ b/core/events.go @@ -24,6 +24,9 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration. +type ReannoTxsEvent struct{ Txs []*types.Transaction } + // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } diff --git a/core/tx_pool.go b/core/tx_pool.go index d0304857c3..4c08bc72b3 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -49,6 +49,9 @@ const ( // more expensive to propagate; larger transactions also take more resources // to validate whether they fit into the pool or not. txMaxSize = 4 * txSlotSize // 128KB + + // txReannoMaxNum is the maximum number of transactions a reannounce action can include. + txReannoMaxNum = 1024 ) var ( @@ -88,6 +91,7 @@ var ( var ( evictionInterval = time.Minute // Time interval to check for evictable transactions statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats + reannounceInterval = time.Minute // Time interval to check for reannounce transactions ) var ( @@ -152,7 +156,8 @@ type TxPoolConfig struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts - Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + ReannounceTime time.Duration // Duration for announcing local pending transactions again } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -169,7 +174,8 @@ var DefaultTxPoolConfig = TxPoolConfig{ AccountQueue: 64, GlobalQueue: 1024, - Lifetime: 3 * time.Hour, + Lifetime: 3 * time.Hour, + ReannounceTime: 10 * 365 * 24 * time.Hour, } // sanitize checks the provided user configurations and changes anything that's @@ -208,6 +214,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime) conf.Lifetime = DefaultTxPoolConfig.Lifetime } + if conf.ReannounceTime < time.Minute { + log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute) + conf.ReannounceTime = time.Minute + } return conf } @@ -219,14 +229,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + reannoTxFeed event.Feed // Event feed for announcing transactions again + scope event.SubscriptionScope + signer types.Signer + mu sync.RWMutex istanbul bool // Fork indicator whether we are in the istanbul stage. eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions. @@ -323,14 +334,16 @@ func (pool *TxPool) loop() { var ( prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers - report = time.NewTicker(statsReportInterval) - evict = time.NewTicker(evictionInterval) - journal = time.NewTicker(pool.config.Rejournal) + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) + reannounce = time.NewTicker(reannounceInterval) + journal = time.NewTicker(pool.config.Rejournal) // Track the previous head headers for transaction reorgs head = pool.chain.CurrentBlock() ) defer report.Stop() defer evict.Stop() + defer reannounce.Stop() defer journal.Stop() for { @@ -378,6 +391,33 @@ func (pool *TxPool) loop() { } pool.mu.Unlock() + case <-reannounce.C: + pool.mu.RLock() + reannoTxs := func() []*types.Transaction { + txs := make([]*types.Transaction, 0) + for addr, list := range pool.pending { + if !pool.locals.contains(addr) { + continue + } + + for _, tx := range list.Flatten() { + // Default ReannounceTime is 10 years, won't announce by default. + if time.Since(tx.Time()) < pool.config.ReannounceTime { + break + } + txs = append(txs, tx) + if len(txs) >= txReannoMaxNum { + return txs + } + } + } + return txs + }() + pool.mu.RUnlock() + if len(reannoTxs) > 0 { + pool.reannoTxFeed.Send(ReannoTxsEvent{reannoTxs}) + } + // Handle local transaction journal rotation case <-journal.C: if pool.journal != nil { @@ -412,6 +452,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscripti return pool.scope.Track(pool.txFeed.Subscribe(ch)) } +// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- ReannoTxsEvent) event.Subscription { + return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 5d555f5a9c..5f27631e15 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1933,6 +1933,47 @@ func TestTransactionSlotCount(t *testing.T) { } } +// Tests the local pending transaction announced again correctly. +func TestTransactionPendingReannouce(t *testing.T) { + t.Parallel() + + // Create the pool to test the limit enforcement with + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + + config := testTxPoolConfig + // This ReannounceTime will be modified to time.Minute when creating tx_pool. + config.ReannounceTime = time.Second + reannounceInterval = time.Second + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + // Modify ReannounceTime to trigger quicker. + pool.config.ReannounceTime = time.Second + defer pool.Stop() + + key, _ := crypto.GenerateKey() + account := crypto.PubkeyToAddress(key.PublicKey) + pool.currentState.AddBalance(account, big.NewInt(1000000)) + + events := make(chan ReannoTxsEvent, testTxPoolConfig.AccountQueue) + sub := pool.reannoTxFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Generate a batch of transactions and add to tx_pool locally. + txs := make([]*types.Transaction, 0, testTxPoolConfig.AccountQueue) + for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ { + txs = append(txs, transaction(i, 100000, key)) + } + pool.AddLocals(txs) + + select { + case ev := <-events: + t.Logf("received reannouce event, txs length: %d", len(ev.Txs)) + case <-time.After(5 * time.Second): + t.Errorf("reannouce event not fired") + } +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } diff --git a/core/types/transaction.go b/core/types/transaction.go index b127cb2af6..74c011544b 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -82,6 +82,11 @@ type TxData interface { setSignatureValues(chainID, v, r, s *big.Int) } +// Time returns transaction's time +func (tx *Transaction) Time() time.Time { + return tx.time +} + // EncodeRLP implements rlp.Encoder func (tx *Transaction) EncodeRLP(w io.Writer) error { if tx.Type() == LegacyTxType { diff --git a/eth/handler.go b/eth/handler.go index f00f955b34..9eb448040b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -73,6 +73,10 @@ type txPool interface { // SubscribeNewTxsEvent should return an event subscription of // NewTxsEvent and send events to the given channel. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + + // SubscribeReannoTxsEvent should return an event subscription of + // ReannoTxsEvent and send events to the given channel. + SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -120,6 +124,8 @@ type handler struct { eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + reannoTxsCh chan core.ReannoTxsEvent + reannoTxsSub event.Subscription minedBlockSub *event.TypeMuxSubscription whitelist map[uint64]common.Hash @@ -432,6 +438,12 @@ func (h *handler) Start(maxPeers int) { h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh) go h.txBroadcastLoop() + // announce local pending transactions again + h.wg.Add(1) + h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize) + h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh) + go h.txReannounceLoop() + // broadcast mined blocks h.wg.Add(1) h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -445,6 +457,7 @@ func (h *handler) Start(maxPeers int) { func (h *handler) Stop() { h.txsSub.Unsubscribe() // quits txBroadcastLoop + h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit chainSync and txsync64. @@ -549,6 +562,31 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { "tx packs", directPeers, "broadcast txs", directCount) } +// ReannounceTransactions will announce a batch of local pending transactions +// to a square root of all peers. +func (h *handler) ReannounceTransactions(txs types.Transactions) { + var ( + annoCount int // Count of announcements made + annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce + ) + + // Announce transactions hash to a batch of peers + peersCount := uint(math.Sqrt(float64(h.peers.len()))) + peers := h.peers.headPeers(peersCount) + for _, tx := range txs { + for _, peer := range peers { + annos[peer] = append(annos[peer], tx.Hash()) + } + } + + for peer, hashes := range annos { + annoCount += len(hashes) + peer.AsyncSendPooledTransactionHashes(hashes) + } + log.Debug("Transaction reannounce", "txs", len(txs), + "announce packs", peersCount, "announced hashes", annoCount) +} + // minedBroadcastLoop sends mined blocks to connected peers. func (h *handler) minedBroadcastLoop() { defer h.wg.Done() @@ -573,3 +611,16 @@ func (h *handler) txBroadcastLoop() { } } } + +// txReannounceLoop announces local pending transactions to connected peers again. +func (h *handler) txReannounceLoop() { + defer h.wg.Done() + for { + select { + case event := <-h.reannoTxsCh: + h.ReannounceTransactions(event.Txs) + case <-h.reannoTxsSub.Err(): + return + } + } +} diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 271bae07c7..aad2c72b1b 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) { } } +// Tests that local pending transactions get propagated to peers. +func TestTransactionPendingReannounce(t *testing.T) { + t.Parallel() + + // Create a source handler to announce transactions from and a sink handler + // to receive them. + source := newTestHandler() + defer source.close() + + sink := newTestHandler() + defer sink.close() + sink.handler.acceptTxs = 1 // mark synced to accept transactions + + sourcePipe, sinkPipe := p2p.MsgPipe() + defer sourcePipe.Close() + defer sinkPipe.Close() + + sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool) + sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool) + defer sourcePeer.Close() + defer sinkPeer.Close() + + go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(source.handler), peer) + }) + go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(sink.handler), peer) + }) + + // Subscribe transaction pools + txCh := make(chan core.NewTxsEvent, 1024) + sub := sink.txpool.SubscribeNewTxsEvent(txCh) + defer sub.Unsubscribe() + + txs := make([]*types.Transaction, 64) + for nonce := range txs { + tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil) + tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) + + txs[nonce] = tx + } + source.txpool.ReannouceTransactions(txs) + + for arrived := 0; arrived < len(txs); { + select { + case event := <-txCh: + arrived += len(event.Txs) + case <-time.NewTimer(time.Second).C: + t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs)) + } + } +} + // Tests that post eth protocol handshake, clients perform a mutual checkpoint // challenge to validate each other's chains. Hash mismatches, or missing ones // during a fast sync should lead to the peer getting dropped. diff --git a/eth/handler_test.go b/eth/handler_test.go index a90ef5c348..c3b7b769b2 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -48,8 +48,9 @@ var ( type testTxPool struct { pool map[common.Hash]*types.Transaction // Hash map of collected transactions - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + txFeed event.Feed // Notification feed to allow waiting for inclusion + reannoTxFeed event.Feed // Notification feed to trigger reannouce + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -90,6 +91,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error { return make([]error, len(txs)) } +// ReannouceTransactions announce the transactions to some peers. +func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error { + p.lock.Lock() + defer p.lock.Unlock() + + for _, tx := range txs { + p.pool[tx.Hash()] = tx + } + p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs}) + return make([]error, len(txs)) +} + // Pending returns all the transactions known to the pool func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { p.lock.RLock() @@ -112,6 +125,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs return p.txFeed.Subscribe(ch) } +// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and +// send events to the given channel. +func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { + return p.reannoTxFeed.Subscribe(ch) +} + // testHandler is a live implementation of the Ethereum protocol handler, just // preinitialized with some sane testing defaults and the transaction pool mocked // out. diff --git a/eth/peerset.go b/eth/peerset.go index f0955f34c6..220b01d832 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -266,6 +266,22 @@ func (ps *peerSet) peer(id string) *ethPeer { return ps.peers[id] } +// headPeers retrieves a specified number list of peers. +func (ps *peerSet) headPeers(num uint) []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + if num > uint(len(ps.peers)) { + num = uint(len(ps.peers)) + } + + list := make([]*ethPeer, 0, num) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + // peersWithoutBlock retrieves a list of peers that do not have a given block in // their set of known hashes so it might be propagated to them. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {