Skip to content

Commit

Permalink
make transaction propagation paths in the network deterministic (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanmorphl2 authored Jul 8, 2024
1 parent 75a2b01 commit ce4be6c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 11 deletions.
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Expand Down
77 changes: 66 additions & 11 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/forkid"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/eth/downloader"
"github.com/scroll-tech/go-ethereum/eth/fetcher"
"github.com/scroll-tech/go-ethereum/eth/protocols/eth"
Expand All @@ -36,13 +37,20 @@ import (
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/p2p/enode"
"github.com/scroll-tech/go-ethereum/params"
"golang.org/x/crypto/sha3"
)

const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096

// txMaxBroadcastSize is the max size of a transaction that will be broadcasted.
// All transactions with a higher size will be announced and need to be fetched
// by the peer.
txMaxBroadcastSize = 4096
)

var (
Expand Down Expand Up @@ -75,6 +83,7 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Expand All @@ -87,6 +96,7 @@ type handlerConfig struct {
}

type handler struct {
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

Expand Down Expand Up @@ -128,6 +138,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
Expand Down Expand Up @@ -549,6 +560,9 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) {
var (
blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only

annoCount int // Count of announcements made
annoPeers int
directCount int // Count of the txs sent directly to peers
Expand All @@ -558,21 +572,63 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce

)

// Broadcast transactions to a batch of peers not knowing about it
direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to
if direct.BitLen() == 0 {
direct = big.NewInt(1)
}
total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers

var (
signer = types.LatestSignerForChainID(h.chain.Config().ChainID) // Don't care about chain status, we just need *a* sender
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
)

for _, tx := range txs {
// L1 messages are not broadcast to peers
if tx.IsL1MessageTx() {
continue
}
peers := h.peers.peersWithoutTransaction(tx.Hash())
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())

var maybeDirect bool
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
maybeDirect = true
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
annos[peer] = append(annos[peer], tx.Hash())

// Send the transaction (if it's small enough) directly to a subset of
// the peers that have not received it yet, ensuring that the flow of
// transactions is groupped by account to (try and) avoid nonce gaps.
//
// To do this, we hash the local enode IW with together with a peer's
// enode ID together with the transaction sender and broadcast if
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) {
var broadcast bool
if maybeDirect {
hasher.Reset()
hasher.Write(h.nodeID.Bytes())
hasher.Write(peer.Node().ID().Bytes())

from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter
hasher.Write(from.Bytes())

hasher.Read(hash)
if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 {
broadcast = true
}
}
if broadcast {
txset[peer] = append(txset[peer], tx.Hash())
} else {
annos[peer] = append(annos[peer], tx.Hash())
}
}
}
for peer, hashes := range txset {
Expand All @@ -585,9 +641,8 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annoPeers, "anncount", annoCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand Down

0 comments on commit ce4be6c

Please sign in to comment.