From 610f6a5e633638794d1c215c74d87e2ce13e2f42 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Wed, 13 Oct 2021 11:30:19 +0800 Subject: [PATCH] [R4R] add extension in eth protocol handshake to disable tx broadcast (#412) * add extension for eth protocol handshake * fix comments --- eth/backend.go | 23 ++++++----- eth/downloader/peer.go | 8 ++-- eth/ethconfig/config.go | 5 ++- eth/ethconfig/gen_config.go | 6 +++ eth/handler.go | 55 +++++++++++++------------ eth/handler_eth_test.go | 10 ++--- eth/protocols/eth/broadcast.go | 6 +++ eth/protocols/eth/handshake.go | 62 ++++++++++++++++++++++++++++- eth/protocols/eth/handshake_test.go | 2 +- eth/protocols/eth/peer.go | 33 ++++++++++++--- eth/protocols/eth/protocol.go | 40 ++++++++++++++++++- 11 files changed, 193 insertions(+), 57 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index bdae4b4235dd..f6599529db1f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -231,17 +231,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } if eth.handler, err = newHandler(&handlerConfig{ - Database: chainDb, - Chain: eth.blockchain, - TxPool: eth.txPool, - Network: config.NetworkId, - Sync: config.SyncMode, - BloomCache: uint64(cacheLimit), - EventMux: eth.eventMux, - Checkpoint: checkpoint, - Whitelist: config.Whitelist, - DirectBroadcast: config.DirectBroadcast, - DiffSync: config.DiffSync, + Database: chainDb, + Chain: eth.blockchain, + TxPool: eth.txPool, + Network: config.NetworkId, + Sync: config.SyncMode, + BloomCache: uint64(cacheLimit), + EventMux: eth.eventMux, + Checkpoint: checkpoint, + Whitelist: config.Whitelist, + DirectBroadcast: config.DirectBroadcast, + DiffSync: config.DiffSync, + DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, }); err != nil { return nil, err } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4d76988f7102..297ba2fa552a 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.headerThroughput } - return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.blockThroughput } - return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.receiptThroughput } - return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -499,7 +499,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.stateThroughput } - return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index bf143ba02c72..83db7fc99893 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -124,8 +124,9 @@ type Config struct { Genesis *core.Genesis `toml:",omitempty"` // Protocol options - NetworkId uint64 // Network ID to use for selecting peers to connect to - SyncMode downloader.SyncMode + NetworkId uint64 // Network ID to use for selecting peers to connect to + SyncMode downloader.SyncMode + DisablePeerTxBroadcast bool // This can be set to list of enrtree:// URLs which will be queried for // for nodes to connect to. diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 258ade229367..f192a1aacee8 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -20,6 +20,7 @@ func (c Config) MarshalTOML() (interface{}, error) { Genesis *core.Genesis `toml:",omitempty"` NetworkId uint64 SyncMode downloader.SyncMode + DisablePeerTxBroadcast bool EthDiscoveryURLs []string SnapDiscoveryURLs []string NoPruning bool @@ -68,6 +69,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.Genesis = c.Genesis enc.NetworkId = c.NetworkId enc.SyncMode = c.SyncMode + enc.DisablePeerTxBroadcast = c.DisablePeerTxBroadcast enc.EthDiscoveryURLs = c.EthDiscoveryURLs enc.SnapDiscoveryURLs = c.SnapDiscoveryURLs enc.NoPruning = c.NoPruning @@ -119,6 +121,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { Genesis *core.Genesis `toml:",omitempty"` NetworkId *uint64 SyncMode *downloader.SyncMode + DisablePeerTxBroadcast *bool EthDiscoveryURLs []string SnapDiscoveryURLs []string NoPruning *bool @@ -176,6 +179,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.SyncMode != nil { c.SyncMode = *dec.SyncMode } + if dec.DisablePeerTxBroadcast != nil { + c.DisablePeerTxBroadcast = *dec.DisablePeerTxBroadcast + } if dec.EthDiscoveryURLs != nil { c.EthDiscoveryURLs = dec.EthDiscoveryURLs } diff --git a/eth/handler.go b/eth/handler.go index 41b459d2d5f6..f00f955b348b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -78,22 +78,24 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { - Database ethdb.Database // Database for direct sync insertions - Chain *core.BlockChain // Blockchain to serve data from - TxPool txPool // Transaction pool to propagate from - Network uint64 // Network identifier to adfvertise - Sync downloader.SyncMode // Whether to fast or full sync - DiffSync bool // Whether to diff sync - BloomCache uint64 // Megabytes to alloc for fast sync bloom - EventMux *event.TypeMux // Legacy event mux, deprecate for `feed` - Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges - Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged - DirectBroadcast bool + Database ethdb.Database // Database for direct sync insertions + Chain *core.BlockChain // Blockchain to serve data from + TxPool txPool // Transaction pool to propagate from + Network uint64 // Network identifier to adfvertise + Sync downloader.SyncMode // Whether to fast or full sync + DiffSync bool // Whether to diff sync + BloomCache uint64 // Megabytes to alloc for fast sync bloom + EventMux *event.TypeMux // Legacy event mux, deprecate for `feed` + Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges + Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged + DirectBroadcast bool + DisablePeerTxBroadcast bool } type handler struct { - networkID uint64 - forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node + networkID uint64 + forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node + disablePeerTxBroadcast bool fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol @@ -138,18 +140,19 @@ func newHandler(config *handlerConfig) (*handler, error) { config.EventMux = new(event.TypeMux) // Nicety initialization for tests } h := &handler{ - networkID: config.Network, - forkFilter: forkid.NewFilter(config.Chain), - eventMux: config.EventMux, - database: config.Database, - txpool: config.TxPool, - chain: config.Chain, - peers: newPeerSet(), - whitelist: config.Whitelist, - directBroadcast: config.DirectBroadcast, - diffSync: config.DiffSync, - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: config.Network, + forkFilter: forkid.NewFilter(config.Chain), + disablePeerTxBroadcast: config.DisablePeerTxBroadcast, + eventMux: config.EventMux, + database: config.Database, + txpool: config.TxPool, + chain: config.Chain, + peers: newPeerSet(), + whitelist: config.Whitelist, + directBroadcast: config.DirectBroadcast, + diffSync: config.DiffSync, + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the fast @@ -276,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { td = h.chain.GetTd(hash, number) ) forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64()) - if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil { + if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, ð.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil { peer.Log().Debug("Ethereum handshake failed", "err", err) return err } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 1d38e3b66663..271bae07c763 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -271,7 +271,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { head = handler.chain.CurrentBlock() td = handler.chain.GetTd(head.Hash(), head.NumberU64()) ) - if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { + if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // Send the transaction to the sink and verify that it's added to the tx pool @@ -333,7 +333,7 @@ func testSendTransactions(t *testing.T, protocol uint) { head = handler.chain.CurrentBlock() td = handler.chain.GetTd(head.Hash(), head.NumberU64()) ) - if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { + if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // After the handshake completes, the source handler should stream the sink @@ -532,7 +532,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo head = handler.chain.CurrentBlock() td = handler.chain.GetTd(head.Hash(), head.NumberU64()) ) - if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { + if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // Connect a new peer and check that we receive the checkpoint challenge @@ -616,7 +616,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error { return eth.Handle((*ethHandler)(source.handler), peer) }) - if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } go eth.Handle(sink, sinkPeer) @@ -689,7 +689,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { genesis = source.chain.Genesis() td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) ) - if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // After the handshake completes, the source handler should stream the sink diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index e0ee2a1cfaed..132eac0102ff 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -122,6 +122,9 @@ func (p *Peer) broadcastTransactions() { case <-fail: failed = true + case <-p.txTerm: + return + case <-p.term: return } @@ -189,6 +192,9 @@ func (p *Peer) announceTransactions() { case <-fail: failed = true + case <-p.txTerm: + return + case <-p.term: return } diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index b634f18e00b2..d604f045f48b 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -35,7 +35,7 @@ const ( // Handshake executes the eth protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. -func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error { +func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error { // Send out own handshake in a new thread errc := make(chan error, 2) @@ -68,6 +68,49 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis } p.td, p.head = status.TD, status.Head + if p.version >= ETH67 { + var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc + if extension == nil { + extension = &UpgradeStatusExtension{} + } + extensionRaw, err := extension.Encode() + if err != nil { + return err + } + + gopool.Submit(func() { + errc <- p2p.Send(p.rw, UpgradeStatusMsg, &UpgradeStatusPacket{ + Extension: extensionRaw, + }) + }) + gopool.Submit(func() { + errc <- p.readUpgradeStatus(&upgradeStatus) + }) + timeout := time.NewTimer(handshakeTimeout) + defer timeout.Stop() + for i := 0; i < 2; i++ { + select { + case err := <-errc: + if err != nil { + return err + } + case <-timeout.C: + return p2p.DiscReadTimeout + } + } + + extension, err := upgradeStatus.GetExtension() + if err != nil { + return err + } + p.statusExtension = extension + + if p.statusExtension.DisablePeerTxBroadcast { + p.Log().Debug("peer does not need broadcast txs, closing broadcast routines") + p.CloseTxBroadcast() + } + } + // TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times // larger, it will still fit within 100 bits if tdlen := p.td.BitLen(); tdlen > 100 { @@ -106,3 +149,20 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H } return nil } + +func (p *Peer) readUpgradeStatus(status *UpgradeStatusPacket) error { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Code != UpgradeStatusMsg { + return fmt.Errorf("%w: upgrade status msg has code %x (!= %x)", errNoStatusMsg, msg.Code, UpgradeStatusMsg) + } + if msg.Size > maxMessageSize { + return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) + } + if err := msg.Decode(&status); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return nil +} diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index 3bebda2dcc9b..8a433f3ce45d 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -81,7 +81,7 @@ func testHandshake(t *testing.T, protocol uint) { // Send the junk test with one peer, check the handshake failure go p2p.Send(app, test.code, test.data) - err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain)) + err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil) if err == nil { t.Errorf("test %d: protocol returned nil error, want %q", i, test.want) } else if !errors.Is(err, test.want) { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index e619c183ba2f..7ab4fa1a36c2 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -22,6 +22,7 @@ import ( "sync" mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" @@ -68,9 +69,10 @@ func max(a, b int) int { type Peer struct { id string // Unique ID for the peer, cached - *p2p.Peer // The embedded P2P package peer - rw p2p.MsgReadWriter // Input/output streams for snap - version uint // Protocol version negotiated + *p2p.Peer // The embedded P2P package peer + rw p2p.MsgReadWriter // Input/output streams for snap + version uint // Protocol version negotiated + statusExtension *UpgradeStatusExtension head common.Hash // Latest advertised head block hash td *big.Int // Latest advertised head block total difficulty @@ -84,8 +86,9 @@ type Peer struct { txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests - term chan struct{} // Termination channel to stop the broadcasters - lock sync.RWMutex // Mutex protecting the internal fields + term chan struct{} // Termination channel to stop the broadcasters + txTerm chan struct{} // Termination channel to stop the tx broadcasters + lock sync.RWMutex // Mutex protecting the internal fields } // NewPeer create a wrapper for a network connection and negotiated protocol @@ -104,6 +107,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe txAnnounce: make(chan []common.Hash), txpool: txpool, term: make(chan struct{}), + txTerm: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastBlocks() @@ -119,6 +123,17 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe // clean it up! func (p *Peer) Close() { close(p.term) + + p.CloseTxBroadcast() +} + +// CloseTxBroadcast signals the tx broadcast goroutine to terminate. +func (p *Peer) CloseTxBroadcast() { + select { + case <-p.txTerm: + default: + close(p.txTerm) + } } // ID retrieves the peer's unique identifier. @@ -212,6 +227,10 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { for _, hash := range hashes { p.knownTxs.Add(hash) } + + case <-p.txTerm: + p.Log().Debug("Dropping transaction propagation", "count", len(hashes)) + case <-p.term: p.Log().Debug("Dropping transaction propagation", "count", len(hashes)) } @@ -247,6 +266,10 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) { for _, hash := range hashes { p.knownTxs.Add(hash) } + + case <-p.txTerm: + p.Log().Debug("Dropping transaction announcement", "count", len(hashes)) + case <-p.term: p.Log().Debug("Dropping transaction announcement", "count", len(hashes)) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index de1b0ed1ee7f..3e0e0cf6eddc 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH65 = 65 ETH66 = 66 + ETH67 = 67 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH66, ETH65} +var ProtocolVersions = []uint{ETH67, ETH66, ETH65} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17} +var protocolLengths = map[uint]uint64{ETH67: 18, ETH66: 17, ETH65: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -68,6 +69,9 @@ const ( NewPooledTransactionHashesMsg = 0x08 GetPooledTransactionsMsg = 0x09 PooledTransactionsMsg = 0x0a + + // Protocol messages overloaded in eth/66 + UpgradeStatusMsg = 0x0b ) var ( @@ -97,6 +101,35 @@ type StatusPacket struct { ForkID forkid.ID } +type UpgradeStatusExtension struct { + DisablePeerTxBroadcast bool +} + +func (e *UpgradeStatusExtension) Encode() (*rlp.RawValue, error) { + rawBytes, err := rlp.EncodeToBytes(e) + if err != nil { + return nil, err + } + raw := rlp.RawValue(rawBytes) + return &raw, nil +} + +type UpgradeStatusPacket struct { + Extension *rlp.RawValue `rlp:"nil"` +} + +func (p *UpgradeStatusPacket) GetExtension() (*UpgradeStatusExtension, error) { + extension := &UpgradeStatusExtension{} + if p.Extension == nil { + return extension, nil + } + err := rlp.DecodeBytes(*p.Extension, extension) + if err != nil { + return nil, err + } + return extension, nil +} + // NewBlockHashesPacket is the network packet for the block announcements. type NewBlockHashesPacket []struct { Hash common.Hash // Hash of one particular block being announced @@ -324,6 +357,9 @@ type PooledTransactionsRLPPacket66 struct { func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } +func (*UpgradeStatusPacket) Name() string { return "UpgradeStatus" } +func (*UpgradeStatusPacket) Kind() byte { return UpgradeStatusMsg } + func (*NewBlockHashesPacket) Name() string { return "NewBlockHashes" } func (*NewBlockHashesPacket) Kind() byte { return NewBlockHashesMsg }