From dc06d8977b97ee05799e856027f6f2f210a815bb Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 10 Aug 2022 14:02:03 +0200 Subject: [PATCH 01/10] eth/fetcher: introduce some lag in tx fetching --- eth/fetcher/tx_fetcher.go | 63 ++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 035e0c2ec7d8..8503a4abff36 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -275,32 +275,52 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) duplicate int64 underpriced int64 otherreject int64 + delay time.Time ) - errs := f.addTxs(txs) - for i, err := range errs { - // Track the transaction hash if the price is too low for us. - // Avoid re-request this transaction when we receive another - // announcement. - if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) { - for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { - f.underpriced.Pop() - } - f.underpriced.Add(txs[i].Hash()) + // proceed in batches + for i := 0; i < len(txs); i += 100 { + end := i + 100 + if end > len(txs) { + end = len(txs) } - // Track a few interesting failure types - switch { - case err == nil: // Noop, but need to handle to not count these + batch := txs[i:end] + errs := f.addTxs(batch) + for j, err := range errs { + // Track the transaction hash if the price is too low for us. + // Avoid re-request this transaction when we receive another + // announcement. + if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) { + for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { + f.underpriced.Pop() + } + f.underpriced.Add(batch[i].Hash()) + } + // Track a few interesting failure types + switch { + case err == nil: // Noop, but need to handle to not count these - case errors.Is(err, core.ErrAlreadyKnown): - duplicate++ + case errors.Is(err, core.ErrAlreadyKnown): + duplicate++ - case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced): - underpriced++ + case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced): + underpriced++ - default: - otherreject++ + default: + otherreject++ + } + added = append(added, batch[j].Hash()) + } + // If 'other reject' is >25% of the deliveries, abort + if 4*otherreject > len(added) { + delay = time.Millisecond * 200 + break + } + // If >50% of all transactions are rejected, abort. Either we or the peer + // are out of sync with the chain. + if 2*(duplicate+underpriced+otherreject) > len(added) { + delay = time.Millisecond * 200 + break } - added = append(added, txs[i].Hash()) } if direct { txReplyKnownMeter.Mark(duplicate) @@ -313,6 +333,9 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } select { case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: + if delay > 0 { + time.Sleep(delay) + } return nil case <-f.quit: return errTerminated From 81f356e7ad65b44beab29ffb3f76efe694ac44d6 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 10 Aug 2022 16:00:15 +0200 Subject: [PATCH 02/10] eth/fetcher: change conditions a bit --- eth/fetcher/tx_fetcher.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 8503a4abff36..5b4e1f35d00b 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -310,17 +310,12 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } added = append(added, batch[j].Hash()) } - // If 'other reject' is >25% of the deliveries, abort + // If 'other reject' is >25% of the deliveries, abort. Either we are + // out of sync with the chain or the peer is griefing us. if 4*otherreject > len(added) { delay = time.Millisecond * 200 break } - // If >50% of all transactions are rejected, abort. Either we or the peer - // are out of sync with the chain. - if 2*(duplicate+underpriced+otherreject) > len(added) { - delay = time.Millisecond * 200 - break - } } if direct { txReplyKnownMeter.Mark(duplicate) From 9a49af1e499a4d2c3f86be00d2df26d1d442e086 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 12 Aug 2022 13:21:28 +0200 Subject: [PATCH 03/10] eth/fetcher: use per-batch quota check --- eth/fetcher/tx_fetcher.go | 45 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 5b4e1f35d00b..4d4da140736c 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -262,20 +262,28 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // direct request replies. The differentiation is important so the fetcher can // re-schedule missing transactions as soon as possible. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { - // Keep track of all the propagated transactions + var ( + inMeter, knownMeter, underpricedMeter, otherRejectMeter metrics.Meter + ) if direct { - txReplyInMeter.Mark(int64(len(txs))) + inMeter = txReplyInMeter + knownMeter = txReplyKnownMeter + underpricedMeter = txReplyUnderpricedMeter + otherRejectMeter = txReplyOtherRejectMeter } else { - txBroadcastInMeter.Mark(int64(len(txs))) + inMeter = txBroadcastInMeter + knownMeter = txBroadcastKnownMeter + underpricedMeter = txBroadcastUnderpricedMeter + otherRejectMeter = txBroadcastOtherRejectMeter } + // Keep track of all the propagated transactions + inMeter.Mark(int64(len(txs))) + // Push all the transactions into the pool, tracking underpriced ones to avoid // re-requesting them and dropping the peer in case of malicious transfers. var ( - added = make([]common.Hash, 0, len(txs)) - duplicate int64 - underpriced int64 - otherreject int64 - delay time.Time + added = make([]common.Hash, 0, len(txs)) + delay time.Duration ) // proceed in batches for i := 0; i < len(txs); i += 100 { @@ -283,6 +291,11 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) if end > len(txs) { end = len(txs) } + var ( + duplicate int64 + underpriced int64 + otherreject int64 + ) batch := txs[i:end] errs := f.addTxs(batch) for j, err := range errs { @@ -310,22 +323,16 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } added = append(added, batch[j].Hash()) } - // If 'other reject' is >25% of the deliveries, abort. Either we are + knownMeter.Mark(duplicate) + underpricedMeter.Mark(underpriced) + otherRejectMeter.Mark(otherreject) + // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are // out of sync with the chain or the peer is griefing us. - if 4*otherreject > len(added) { + if otherreject > 25 { delay = time.Millisecond * 200 break } } - if direct { - txReplyKnownMeter.Mark(duplicate) - txReplyUnderpricedMeter.Mark(underpriced) - txReplyOtherRejectMeter.Mark(otherreject) - } else { - txBroadcastKnownMeter.Mark(duplicate) - txBroadcastUnderpricedMeter.Mark(underpriced) - txBroadcastOtherRejectMeter.Mark(otherreject) - } select { case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: if delay > 0 { From f1b14f925c21e48c537e67fbcba94f8549b0ba57 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 19 Aug 2022 13:02:22 +0200 Subject: [PATCH 04/10] eth/fetcher: fix some comments --- eth/fetcher/tx_fetcher.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 4d4da140736c..3aa6fe464874 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -263,14 +263,12 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // re-schedule missing transactions as soon as possible. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { var ( - inMeter, knownMeter, underpricedMeter, otherRejectMeter metrics.Meter - ) - if direct { - inMeter = txReplyInMeter - knownMeter = txReplyKnownMeter + inMeter = txReplyInMeter + knownMeter = txReplyKnownMeter underpricedMeter = txReplyUnderpricedMeter otherRejectMeter = txReplyOtherRejectMeter - } else { + ) + if !direct { inMeter = txBroadcastInMeter knownMeter = txBroadcastKnownMeter underpricedMeter = txBroadcastUnderpricedMeter @@ -286,8 +284,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) delay time.Duration ) // proceed in batches - for i := 0; i < len(txs); i += 100 { - end := i + 100 + for i := 0; i < len(txs); i += 128 { + end := i + 128 if end > len(txs) { end = len(txs) } @@ -328,7 +326,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) otherRejectMeter.Mark(otherreject) // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are // out of sync with the chain or the peer is griefing us. - if otherreject > 25 { + if otherreject > 128/4 { delay = time.Millisecond * 200 break } From de87588931ea2892b5d36c3d8b1dfd9639e14315 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 19 Aug 2022 13:04:24 +0200 Subject: [PATCH 05/10] eth/fetcher: address review concerns --- eth/fetcher/tx_fetcher.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 3aa6fe464874..9a6d72f4187e 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -295,8 +295,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) otherreject int64 ) batch := txs[i:end] - errs := f.addTxs(batch) - for j, err := range errs { + for j, err := range f.addTxs(batch) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another // announcement. @@ -324,18 +323,17 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) knownMeter.Mark(duplicate) underpricedMeter.Mark(underpriced) otherRejectMeter.Mark(otherreject) + // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are // out of sync with the chain or the peer is griefing us. if otherreject > 128/4 { - delay = time.Millisecond * 200 + delay = 200 * time.Millisecond break } } select { case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: - if delay > 0 { - time.Sleep(delay) - } + time.Sleep(delay) return nil case <-f.quit: return errTerminated From 653baedb89f0485d2805dac2742ac36c45d87fd0 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 19 Aug 2022 14:25:07 +0200 Subject: [PATCH 06/10] eth/fetcher: fix panic + add warn log --- eth/fetcher/tx_fetcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 9a6d72f4187e..369951150155 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -303,7 +303,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { f.underpriced.Pop() } - f.underpriced.Add(batch[i].Hash()) + f.underpriced.Add(batch[j].Hash()) } // Track a few interesting failure types switch { @@ -328,6 +328,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // out of sync with the chain or the peer is griefing us. if otherreject > 128/4 { delay = 200 * time.Millisecond + log.Warn("Peer delivering useless transactions, sleeping", "ignored", len(txs)-i) break } } From fd88ae794e4e4fbf963ecdfa53f81a0dfc3a22aa Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 19 Aug 2022 14:28:25 +0200 Subject: [PATCH 07/10] eth/fetcher: fix log --- eth/fetcher/tx_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 369951150155..e088c30ce26e 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -328,7 +328,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // out of sync with the chain or the peer is griefing us. if otherreject > 128/4 { delay = 200 * time.Millisecond - log.Warn("Peer delivering useless transactions, sleeping", "ignored", len(txs)-i) + log.Warn("Peer delivering useless transactions", "ignored", len(txs)-end, "peer", peer) break } } From 5aa9c1caa305bcc15b423461277a2b8b065359f4 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 19 Aug 2022 14:30:21 +0200 Subject: [PATCH 08/10] eth/fetcher: fix log --- eth/fetcher/tx_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index e088c30ce26e..7c8f16df531f 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -328,7 +328,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // out of sync with the chain or the peer is griefing us. if otherreject > 128/4 { delay = 200 * time.Millisecond - log.Warn("Peer delivering useless transactions", "ignored", len(txs)-end, "peer", peer) + log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end) break } } From 72684b812b80ed72ac05a1b5fdc4d3474045ffb8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 19 Aug 2022 15:11:28 +0200 Subject: [PATCH 09/10] cmd/devp2p/internal/ethtest: fix ignorign tx announcements from prev. tests --- cmd/devp2p/internal/ethtest/helpers.go | 6 +++++- cmd/devp2p/internal/ethtest/suite.go | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/devp2p/internal/ethtest/helpers.go b/cmd/devp2p/internal/ethtest/helpers.go index eeeb4f93cabf..b57649ade99d 100644 --- a/cmd/devp2p/internal/ethtest/helpers.go +++ b/cmd/devp2p/internal/ethtest/helpers.go @@ -357,9 +357,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error { return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash) } return nil + + // ignore tx announcements from previous tests case *NewPooledTransactionHashes: - // ignore tx announcements from previous tests continue + case *Transactions: + continue + default: return fmt.Errorf("unexpected: %s", pretty.Sdump(msg)) } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 7059b4ba738c..4497478d72d6 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -544,9 +544,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) { t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket)) } return + // ignore propagated txs from previous tests case *NewPooledTransactionHashes: continue + case *Transactions: + continue + // ignore block announcements from previous tests case *NewBlockHashes: continue From 3b24887c278b556f9f6e5acbf7765d1dd6e69163 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 19 Aug 2022 15:15:54 +0200 Subject: [PATCH 10/10] cmd/devp2p/internal/ethtest: fix TestLargeTxRequest This increases the number of tx relay messages the test waits for. Since go-ethereum now processes incoming txs in smaller batches, the announcement messages it sends are also smaller. --- cmd/devp2p/internal/ethtest/transaction.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/transaction.go b/cmd/devp2p/internal/ethtest/transaction.go index c4748bf8f7d8..baa55bd49268 100644 --- a/cmd/devp2p/internal/ethtest/transaction.go +++ b/cmd/devp2p/internal/ethtest/transaction.go @@ -29,7 +29,7 @@ import ( "github.com/ethereum/go-ethereum/params" ) -//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7") +// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7") var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") func (s *Suite) sendSuccessfulTxs(t *utesting.T) error { @@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction nonce = txs[len(txs)-1].Nonce() // Wait for the transaction announcement(s) and make sure all sent txs are being propagated. - // all txs should be announced within 3 announcements. + // all txs should be announced within a couple announcements. recvHashes := make([]common.Hash, 0) - for i := 0; i < 3; i++ { + for i := 0; i < 20; i++ { switch msg := recvConn.readAndServe(s.chain, timeout).(type) { case *Transactions: for _, tx := range *msg {