Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "eth/fetcher: don't spend too much time on transaction inclusion" #25567

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cmd/devp2p/internal/ethtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,9 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
}
return nil

// ignore tx announcements from previous tests
case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
continue
case *Transactions:
continue

default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,9 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket))
}
return

// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue

// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
6 changes: 3 additions & 3 deletions cmd/devp2p/internal/ethtest/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/ethereum/go-ethereum/params"
)

// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")

func (s *Suite) sendSuccessfulTxs(t *utesting.T) error {
Expand Down Expand Up @@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
nonce = txs[len(txs)-1].Nonce()

// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within a couple announcements.
// all txs should be announced within 3 announcements.
recvHashes := make([]common.Hash, 0)

for i := 0; i < 20; i++ {
for i := 0; i < 3; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions:
for _, tx := range *msg {
Expand Down
98 changes: 38 additions & 60 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,79 +262,57 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
var (
inMeter = txReplyInMeter
knownMeter = txReplyKnownMeter
underpricedMeter = txReplyUnderpricedMeter
otherRejectMeter = txReplyOtherRejectMeter
)
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
}
// Keep track of all the propagated transactions
inMeter.Mark(int64(len(txs)))

if direct {
txReplyInMeter.Mark(int64(len(txs)))
} else {
txBroadcastInMeter.Mark(int64(len(txs)))
}
// Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
delay time.Duration
added = make([]common.Hash, 0, len(txs))
duplicate int64
underpriced int64
otherreject int64
)
// proceed in batches
for i := 0; i < len(txs); i += 128 {
end := i + 128
if end > len(txs) {
end = len(txs)
}
var (
duplicate int64
underpriced int64
otherreject int64
)
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
errs := f.addTxs(txs)
for i, err := range errs {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
f.underpriced.Add(txs[i].Hash())
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these

case errors.Is(err, core.ErrAlreadyKnown):
duplicate++
case errors.Is(err, core.ErrAlreadyKnown):
duplicate++

case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
underpriced++
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
underpriced++

default:
otherreject++
}
added = append(added, batch[j].Hash())
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
otherRejectMeter.Mark(otherreject)

// If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
// out of sync with the chain or the peer is griefing us.
if otherreject > 128/4 {
delay = 200 * time.Millisecond
log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end)
break
default:
otherreject++
}
added = append(added, txs[i].Hash())
}
if direct {
txReplyKnownMeter.Mark(duplicate)
txReplyUnderpricedMeter.Mark(underpriced)
txReplyOtherRejectMeter.Mark(otherreject)
} else {
txBroadcastKnownMeter.Mark(duplicate)
txBroadcastUnderpricedMeter.Mark(underpriced)
txBroadcastOtherRejectMeter.Mark(otherreject)
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
time.Sleep(delay)
return nil
case <-f.quit:
return errTerminated
Expand Down