diff --git a/eth/backend.go b/eth/backend.go index 02eb56ec0..98b5d4e6c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -222,6 +222,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { checkpoint = params.TrustedCheckpoints[genesisHash] } if eth.handler, err = newHandler(&handlerConfig{ + NodeID: eth.p2pServer.Self().ID(), Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, diff --git a/eth/handler.go b/eth/handler.go index 921492d91..8e0a4a8a0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -28,6 +28,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/core/forkid" "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/crypto" "github.com/scroll-tech/go-ethereum/eth/downloader" "github.com/scroll-tech/go-ethereum/eth/fetcher" "github.com/scroll-tech/go-ethereum/eth/protocols/eth" @@ -36,13 +37,20 @@ import ( "github.com/scroll-tech/go-ethereum/event" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/p2p" + "github.com/scroll-tech/go-ethereum/p2p/enode" "github.com/scroll-tech/go-ethereum/params" + "golang.org/x/crypto/sha3" ) const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + // txMaxBroadcastSize is the max size of a transaction that will be broadcasted. + // All transactions with a higher size will be announced and need to be fetched + // by the peer. + txMaxBroadcastSize = 4096 ) var ( @@ -75,6 +83,7 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { + NodeID enode.ID // P2P node ID used for tx propagation topology Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from @@ -87,6 +96,7 @@ type handlerConfig struct { } type handler struct { + nodeID enode.ID networkID uint64 forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node @@ -128,6 +138,7 @@ func newHandler(config *handlerConfig) (*handler, error) { config.EventMux = new(event.TypeMux) // Nicety initialization for tests } h := &handler{ + nodeID: config.NodeID, networkID: config.Network, forkFilter: forkid.NewFilter(config.Chain), eventMux: config.EventMux, @@ -549,6 +560,9 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { // already have the given transaction. func (h *handler) BroadcastTransactions(txs types.Transactions) { var ( + blobTxs int // Number of blob transactions to announce only + largeTxs int // Number of large transactions to announce only + annoCount int // Count of announcements made annoPeers int directCount int // Count of the txs sent directly to peers @@ -558,21 +572,63 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce ) + // Broadcast transactions to a batch of peers not knowing about it + direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to + if direct.BitLen() == 0 { + direct = big.NewInt(1) + } + total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers + + var ( + signer = types.LatestSignerForChainID(h.chain.Config().ChainID) // Don't care about chain status, we just need *a* sender + hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState) + hash = make([]byte, 32) + ) + for _, tx := range txs { // L1 messages are not broadcast to peers if tx.IsL1MessageTx() { continue } - peers := h.peers.peersWithoutTransaction(tx.Hash()) - // Send the tx unconditionally to a subset of our peers - numDirect := int(math.Sqrt(float64(len(peers)))) - for _, peer := range peers[:numDirect] { - txset[peer] = append(txset[peer], tx.Hash()) + + var maybeDirect bool + switch { + case tx.Type() == types.BlobTxType: + blobTxs++ + case tx.Size() > txMaxBroadcastSize: + largeTxs++ + default: + maybeDirect = true } - // For the remaining peers, send announcement only - for _, peer := range peers[numDirect:] { - annos[peer] = append(annos[peer], tx.Hash()) + + // Send the transaction (if it's small enough) directly to a subset of + // the peers that have not received it yet, ensuring that the flow of + // transactions is groupped by account to (try and) avoid nonce gaps. + // + // To do this, we hash the local enode IW with together with a peer's + // enode ID together with the transaction sender and broadcast if + // `sha(self, peer, sender) mod peers < sqrt(peers)`. + for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) { + var broadcast bool + if maybeDirect { + hasher.Reset() + hasher.Write(h.nodeID.Bytes()) + hasher.Write(peer.Node().ID().Bytes()) + + from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter + hasher.Write(from.Bytes()) + + hasher.Read(hash) + if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 { + broadcast = true + } + } + if broadcast { + txset[peer] = append(txset[peer], tx.Hash()) + } else { + annos[peer] = append(annos[peer], tx.Hash()) + } } } for peer, hashes := range txset { @@ -585,9 +641,8 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annoCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) } - log.Debug("Transaction broadcast", "txs", len(txs), - "announce packs", annoPeers, "announced hashes", annoCount, - "tx packs", directPeers, "broadcast txs", directCount) + log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, + "bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annoPeers, "anncount", annoCount) } // minedBroadcastLoop sends mined blocks to connected peers.