Skip to content

Commit

Permalink
[R4R] add extension in eth protocol handshake to disable tx broadcast (
Browse files Browse the repository at this point in the history
…ethereum#412)

* add extension for eth protocol handshake

* fix comments
  • Loading branch information
yutianwu committed Oct 13, 2021
1 parent c737f66 commit 610f6a5
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 57 deletions.
23 changes: 12 additions & 11 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}

if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
DirectBroadcast: config.DirectBroadcast,
DiffSync: config.DiffSync,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
DirectBroadcast: config.DirectBroadcast,
DiffSync: config.DiffSync,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
}); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
Expand All @@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
Expand All @@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
Expand All @@ -499,7 +499,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
Expand Down
5 changes: 3 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ type Config struct {
Genesis *core.Genesis `toml:",omitempty"`

// Protocol options
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
DisablePeerTxBroadcast bool

// This can be set to list of enrtree:// URLs which will be queried for
// for nodes to connect to.
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 29 additions & 26 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,24 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
DiffSync bool // Whether to diff sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
DirectBroadcast bool
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
DiffSync bool // Whether to diff sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
DirectBroadcast bool
DisablePeerTxBroadcast bool
}

type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
Expand Down Expand Up @@ -138,18 +140,19 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
Expand Down Expand Up @@ -276,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -333,7 +333,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -532,7 +532,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Connect a new peer and check that we receive the checkpoint challenge
Expand Down Expand Up @@ -616,7 +616,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -689,7 +689,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (p *Peer) broadcastTransactions() {
case <-fail:
failed = true

case <-p.txTerm:
return

case <-p.term:
return
}
Expand Down Expand Up @@ -189,6 +192,9 @@ func (p *Peer) announceTransactions() {
case <-fail:
failed = true

case <-p.txTerm:
return

case <-p.term:
return
}
Expand Down
62 changes: 61 additions & 1 deletion eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand Down Expand Up @@ -68,6 +68,49 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
}
p.td, p.head = status.TD, status.Head

if p.version >= ETH67 {
var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc
if extension == nil {
extension = &UpgradeStatusExtension{}
}
extensionRaw, err := extension.Encode()
if err != nil {
return err
}

gopool.Submit(func() {
errc <- p2p.Send(p.rw, UpgradeStatusMsg, &UpgradeStatusPacket{
Extension: extensionRaw,
})
})
gopool.Submit(func() {
errc <- p.readUpgradeStatus(&upgradeStatus)
})
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
for i := 0; i < 2; i++ {
select {
case err := <-errc:
if err != nil {
return err
}
case <-timeout.C:
return p2p.DiscReadTimeout
}
}

extension, err := upgradeStatus.GetExtension()
if err != nil {
return err
}
p.statusExtension = extension

if p.statusExtension.DisablePeerTxBroadcast {
p.Log().Debug("peer does not need broadcast txs, closing broadcast routines")
p.CloseTxBroadcast()
}
}

// TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
// larger, it will still fit within 100 bits
if tdlen := p.td.BitLen(); tdlen > 100 {
Expand Down Expand Up @@ -106,3 +149,20 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
}
return nil
}

func (p *Peer) readUpgradeStatus(status *UpgradeStatusPacket) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Code != UpgradeStatusMsg {
return fmt.Errorf("%w: upgrade status msg has code %x (!= %x)", errNoStatusMsg, msg.Code, UpgradeStatusMsg)
}
if msg.Size > maxMessageSize {
return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
}
if err := msg.Decode(&status); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return nil
}
2 changes: 1 addition & 1 deletion eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain))
err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
Expand Down
Loading

0 comments on commit 610f6a5

Please sign in to comment.