From 4f7f0084ff80d7ea23a5f18b6ed636cd517c56bc Mon Sep 17 00:00:00 2001 From: GalaIO Date: Fri, 21 Jul 2023 14:53:25 +0800 Subject: [PATCH] eth/protocols/handler: add packet sending condition, prevent send small packets frequently; --- eth/protocols/eth/broadcast.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 3f75b044ef..cc1e725e84 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" @@ -27,7 +28,9 @@ import ( const ( // This is the target size for the packs of transactions or announcements. A // pack can get larger than this if a single transactions exceeds this size. - maxTxPacketSize = 100 * 1024 + maxTxPacketSize = 100 * 1024 + minTxPacketSize = 1 * 1024 + sendPacketTimeout = 300 * time.Millisecond ) // blockPropagation is a block propagation event, waiting for its turn in the @@ -136,14 +139,15 @@ func (p *Peer) broadcastTransactions() { // node internals and at the same time rate limits queued data. func (p *Peer) announceTransactions() { var ( - queue []common.Hash // Queue of hashes to announce as transaction stubs - done chan struct{} // Non-nil if background announcer is running - fail = make(chan error, 1) // Channel used to receive network error - failed bool // Flag whether a send failed, discard everything onward + queue []common.Hash // Queue of hashes to announce as transaction stubs + done chan struct{} // Non-nil if background announcer is running + fail = make(chan error, 1) // Channel used to receive network error + failed bool // Flag whether a send failed, discard everything onward + lastSentTime = time.Now() ) for { // If there's no in-flight announce running, check if a new one is needed - if done == nil && len(queue) > 0 { + if done == nil && triggerPacketSending(len(queue)*common.HashLength, lastSentTime) { // Pile transaction hashes until we reach our allowed network limit var ( count int @@ -170,6 +174,7 @@ func (p *Peer) announceTransactions() { close(done) //p.Log().Trace("Sent transaction announcements", "count", len(pending)) }) + lastSentTime = time.Now() } } // Transfer goroutine may or may not have been started, listen for events @@ -200,3 +205,16 @@ func (p *Peer) announceTransactions() { } } } + +// triggerPacketSending if packet reach minTxPacketSize or sendPacketTimeout, it will trigger packet sending +// to prevent only small packets sent frequently in network +func triggerPacketSending(estimateSize int, lastSentTime time.Time) bool { + if estimateSize >= minTxPacketSize { + return true + } + + if time.Since(lastSentTime) >= sendPacketTimeout && estimateSize > 0 { + return true + } + return false +}