From e87368661809f569edb767d9e75250574c3e27ef Mon Sep 17 00:00:00 2001 From: MariusVanDerWijden Date: Thu, 13 Oct 2022 07:00:19 +0200 Subject: [PATCH 1/7] eth: implement eth/68 --- eth/handler_eth.go | 3 +++ eth/handler_eth_test.go | 4 ++++ eth/protocols/eth/broadcast.go | 24 +++++++++++++++++------- eth/protocols/eth/handler.go | 15 +++++++++++++++ eth/protocols/eth/handlers.go | 17 +++++++++++++++++ eth/protocols/eth/peer.go | 6 ++++++ eth/protocols/eth/protocol.go | 13 +++++++++++-- 7 files changed, 73 insertions(+), 9 deletions(-) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 12e91ec7f534..9625a65c7310 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -70,6 +70,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewPooledTransactionHashesPacket: return h.txFetcher.Notify(peer.ID(), *packet) + case *eth.NewPooledTransactionHashesPacket68: + return h.txFetcher.Notify(peer.ID(), *&packet.Hashes) + case *eth.TransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, false) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 9f0c36f950c5..b6b1e59feea6 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -65,6 +65,10 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.txAnnounces.Send(([]common.Hash)(*packet)) return nil + case *eth.NewPooledTransactionHashesPacket68: + h.txAnnounces.Send(([]common.Hash)(*&packet.Hashes)) + return nil + case *eth.TransactionsPacket: h.txBroadcasts.Send(([]*types.Transaction)(*packet)) return nil diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 0afe01b1ce15..336c02352b19 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -142,13 +142,15 @@ func (p *Peer) announceTransactions() { if done == nil && len(queue) > 0 { // Pile transaction hashes until we reach our allowed network limit var ( - count int - pending []common.Hash - size common.StorageSize + count int + pending []common.Hash + pendingTypes []byte + size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { - if p.txpool.Get(queue[count]) != nil { + if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) + pendingTypes = append(pendingTypes, tx.Type()) size += common.HashLength } } @@ -159,10 +161,18 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) go func() { - if err := p.sendPooledTransactionHashes(pending); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactions(pendingTypes, pending); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes(pending); err != nil { + fail <- err + return + } } + close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) }() diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 87b1f20a2dc2..29a0bec30e86 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -196,6 +196,21 @@ var eth67 = map[uint64]msgHandler{ PooledTransactionsMsg: handlePooledTransactions66, } +var eth68 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index d454b3407f3c..9e83b38028c1 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -447,6 +447,23 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) return backend.Handle(peer, ann) } +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket68) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range ann.Hashes { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) +} + func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket66 diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 0a3b7bd56e1b..c8e9dc2665f4 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -222,6 +222,12 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) } +func (p *Peer) sendPooledTransactions(types []byte, hashes []common.Hash) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Hashes: hashes}) +} + // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually // announce to a remote peer. The number of pending sends are capped (new ones // will force old sends to be dropped) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index f6fac4278080..24ace1438382 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH66 = 66 ETH67 = 67 + ETH68 = 68 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH67, ETH66} +var ProtocolVersions = []uint{ETH68, ETH67, ETH66} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17, ETH66: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -301,6 +302,11 @@ type ReceiptsRLPPacket66 struct { // NewPooledTransactionHashesPacket represents a transaction announcement packet. type NewPooledTransactionHashesPacket []common.Hash +type NewPooledTransactionHashesPacket68 struct { + Types []byte + Hashes []common.Hash +} + // GetPooledTransactionsPacket represents a transaction query. type GetPooledTransactionsPacket []common.Hash @@ -367,6 +373,9 @@ func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } + func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg } From c65ce8d5fe49cdc22ed717555754f49f71353fdb Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 24 Oct 2022 13:55:18 +0200 Subject: [PATCH 2/7] eth/protocols/eth: added tx size to announcement --- cmd/devp2p/internal/ethtest/types.go | 2 +- eth/handler_eth.go | 2 +- eth/handler_eth_test.go | 2 +- eth/protocols/eth/broadcast.go | 4 +++- eth/protocols/eth/handlers.go | 2 +- eth/protocols/eth/peer.go | 6 +++--- eth/protocols/eth/protocol.go | 10 ++++++---- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/types.go b/cmd/devp2p/internal/ethtest/types.go index 2c5cb94c699f..c819a7c7b0c0 100644 --- a/cmd/devp2p/internal/ethtest/types.go +++ b/cmd/devp2p/internal/ethtest/types.go @@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 } func (msg NewBlock) ReqID() uint64 { return 0 } // NewPooledTransactionHashes is the network packet for the tx hash propagation message. -type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket +type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket67 func (msg NewPooledTransactionHashes) Code() int { return 24 } func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 9625a65c7310..f52008e5007d 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -67,7 +67,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: return h.txFetcher.Notify(peer.ID(), *packet) case *eth.NewPooledTransactionHashesPacket68: diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index b6b1e59feea6..1db99298f261 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -61,7 +61,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.blockBroadcasts.Send(packet.Block) return nil - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: h.txAnnounces.Send(([]common.Hash)(*packet)) return nil diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 336c02352b19..143e9c69f79e 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -145,12 +145,14 @@ func (p *Peer) announceTransactions() { count int pending []common.Hash pendingTypes []byte + pendingSizes []uint32 size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) pendingTypes = append(pendingTypes, tx.Type()) + pendingSizes = append(pendingSizes, uint32(tx.Size())) size += common.HashLength } } @@ -162,7 +164,7 @@ func (p *Peer) announceTransactions() { done = make(chan struct{}) go func() { if p.version >= ETH68 { - if err := p.sendPooledTransactions(pendingTypes, pending); err != nil { + if err := p.sendPooledTransactions(pending, pendingTypes, pendingSizes); err != nil { fail <- err return } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 9e83b38028c1..e14058045055 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -436,7 +436,7 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket) + ann := new(NewPooledTransactionHashesPacket67) if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index c8e9dc2665f4..76166bfe3210 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -219,13 +219,13 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket67(hashes)) } -func (p *Peer) sendPooledTransactions(types []byte, hashes []common.Hash) error { +func (p *Peer) sendPooledTransactions(hashes []common.Hash, types []byte, sizes []uint32) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Hashes: hashes}) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) } // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 24ace1438382..b6624872e9d1 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -299,11 +299,13 @@ type ReceiptsRLPPacket66 struct { ReceiptsRLPPacket } -// NewPooledTransactionHashesPacket represents a transaction announcement packet. -type NewPooledTransactionHashesPacket []common.Hash +// NewPooledTransactionHashesPacket67 represents a transaction announcement packet. +type NewPooledTransactionHashesPacket67 []common.Hash +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet over eth/68. type NewPooledTransactionHashesPacket68 struct { Types []byte + Sizes []uint32 Hashes []common.Hash } @@ -370,8 +372,8 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg } func (*ReceiptsPacket) Name() string { return "Receipts" } func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } -func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket67) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket67) Kind() byte { return NewPooledTransactionHashesMsg } func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } From 005dc67c3a47d3de6eed440e0b16a05b528552bf Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 24 Oct 2022 14:04:14 +0200 Subject: [PATCH 3/7] eth/protocols/eth: check equal lengths on receiving announcement --- eth/protocols/eth/handlers.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index e14058045055..e8e5501f74af 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -457,6 +457,9 @@ func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) { + return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes)) + } // Schedule all the unknown hashes for retrieval for _, hash := range ann.Hashes { peer.markTransaction(hash) From 8118499a3ee9efacaac7fd1ba3559e459d7bc493 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 24 Oct 2022 14:07:17 +0200 Subject: [PATCH 4/7] eth/protocols/eth: add +1 to tx size because of the type byte --- eth/protocols/eth/broadcast.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 143e9c69f79e..d13fbb0a18f1 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -152,7 +152,7 @@ func (p *Peer) announceTransactions() { if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) pendingTypes = append(pendingTypes, tx.Type()) - pendingSizes = append(pendingSizes, uint32(tx.Size())) + pendingSizes = append(pendingSizes, 1+uint32(tx.Size())) // txsize += 1 because of txtype byte size += common.HashLength } } From 3678d0da60423ffe6527dfbb91e72df1e3eccf71 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 24 Oct 2022 14:18:31 +0200 Subject: [PATCH 5/7] eth: happy lint, add eth68 tests, enable eth68 --- eth/handler_eth.go | 2 +- eth/handler_eth_test.go | 7 +++++-- eth/protocols/eth/handler.go | 5 ++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index f52008e5007d..741471afc788 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -71,7 +71,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.txFetcher.Notify(peer.ID(), *packet) case *eth.NewPooledTransactionHashesPacket68: - return h.txFetcher.Notify(peer.ID(), *&packet.Hashes) + return h.txFetcher.Notify(peer.ID(), packet.Hashes) case *eth.TransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, false) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 1db99298f261..9f4f81d1c294 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -66,7 +66,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return nil case *eth.NewPooledTransactionHashesPacket68: - h.txAnnounces.Send(([]common.Hash)(*&packet.Hashes)) + h.txAnnounces.Send(packet.Hashes) return nil case *eth.TransactionsPacket: @@ -296,6 +296,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { // This test checks that pending transactions are sent. func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } +func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -354,7 +355,7 @@ func testSendTransactions(t *testing.T, protocol uint) { seen := make(map[common.Hash]struct{}) for len(seen) < len(insert) { switch protocol { - case 66: + case 66, 68: select { case hashes := <-anns: for _, hash := range hashes { @@ -382,6 +383,8 @@ func testSendTransactions(t *testing.T, protocol uint) { // broadcasts or via announcements/retrievals. func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } +func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } + func testTransactionPropagation(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 29a0bec30e86..734261251112 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -225,9 +225,12 @@ func handleMessage(backend Backend, peer *Peer) error { defer msg.Discard() var handlers = eth66 - if peer.Version() >= ETH67 { + if peer.Version() == ETH67 { handlers = eth67 } + if peer.Version() >= ETH68 { + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { From bdf3dd1662e84942869f68d4174546884c548687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 26 Oct 2022 12:49:20 +0300 Subject: [PATCH 6/7] eth: various nitpick fixes on eth/68 --- cmd/devp2p/internal/ethtest/types.go | 2 +- eth/handler_eth.go | 2 +- eth/handler_eth_test.go | 13 ++++++++++--- eth/protocols/eth/broadcast.go | 5 ++--- eth/protocols/eth/handler.go | 4 ++-- eth/protocols/eth/handler_test.go | 23 +++++++++++++++++++---- eth/protocols/eth/handlers.go | 4 ++-- eth/protocols/eth/peer.go | 15 +++++++++++---- eth/protocols/eth/peer_test.go | 2 ++ eth/protocols/eth/protocol.go | 10 +++++----- 10 files changed, 55 insertions(+), 25 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/types.go b/cmd/devp2p/internal/ethtest/types.go index c819a7c7b0c0..fd5251d161f3 100644 --- a/cmd/devp2p/internal/ethtest/types.go +++ b/cmd/devp2p/internal/ethtest/types.go @@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 } func (msg NewBlock) ReqID() uint64 { return 0 } // NewPooledTransactionHashes is the network packet for the tx hash propagation message. -type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket67 +type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66 func (msg NewPooledTransactionHashes) Code() int { return 24 } func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 741471afc788..4ed6335769cf 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -67,7 +67,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket67: + case *eth.NewPooledTransactionHashesPacket66: return h.txFetcher.Notify(peer.ID(), *packet) case *eth.NewPooledTransactionHashesPacket68: diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 9f4f81d1c294..885c2a971505 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -61,7 +61,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.blockBroadcasts.Send(packet.Block) return nil - case *eth.NewPooledTransactionHashesPacket67: + case *eth.NewPooledTransactionHashesPacket66: h.txAnnounces.Send(([]common.Hash)(*packet)) return nil @@ -85,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Tests that peers are correctly accepted (or rejected) based on the advertised // fork IDs in the protocol handshake. func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) } +func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) } +func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) } func testForkIDSplit(t *testing.T, protocol uint) { t.Parallel() @@ -239,6 +241,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { // Tests that received transactions are added to the local pool. func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) } +func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) } +func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) } func testRecvTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -296,6 +300,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { // This test checks that pending transactions are sent. func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } +func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) } func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { @@ -355,7 +360,7 @@ func testSendTransactions(t *testing.T, protocol uint) { seen := make(map[common.Hash]struct{}) for len(seen) < len(insert) { switch protocol { - case 66, 68: + case 66, 67, 68: select { case hashes := <-anns: for _, hash := range hashes { @@ -382,7 +387,7 @@ func testSendTransactions(t *testing.T, protocol uint) { // Tests that transactions get propagated to all attached peers, either via direct // broadcasts or via announcements/retrievals. func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } - +func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) } func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } func testTransactionPropagation(t *testing.T, protocol uint) { @@ -685,6 +690,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Tests that a propagated malformed block (uncles or transactions don't match // with the hashes in the header) gets discarded and not broadcast forward. func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) } +func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) } +func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index d13fbb0a18f1..77198bb01a74 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -164,17 +164,16 @@ func (p *Peer) announceTransactions() { done = make(chan struct{}) go func() { if p.version >= ETH68 { - if err := p.sendPooledTransactions(pending, pendingTypes, pendingSizes); err != nil { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { fail <- err return } } else { - if err := p.sendPooledTransactionHashes(pending); err != nil { + if err := p.sendPooledTransactionHashes66(pending); err != nil { fail <- err return } } - close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) }() diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 734261251112..60654b803051 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -168,7 +168,7 @@ var eth66 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, GetBlockHeadersMsg: handleGetBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66, GetBlockBodiesMsg: handleGetBlockBodies66, @@ -185,7 +185,7 @@ var eth67 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, GetBlockHeadersMsg: handleGetBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66, GetBlockBodiesMsg: handleGetBlockBodies66, diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index faea081be539..5c3d1be0a123 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -112,6 +112,8 @@ func (b *testBackend) Handle(*Peer, Packet) error { // Tests that block headers can be retrieved from a remote chain based on user queries. func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) } +func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) } +func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) } func testGetBlockHeaders(t *testing.T, protocol uint) { t.Parallel() @@ -297,6 +299,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { // Tests that block contents can be retrieved from a remote chain based on their hashes. func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) } +func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) } +func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) } func testGetBlockBodies(t *testing.T, protocol uint) { t.Parallel() @@ -379,9 +383,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } // Tests that the state trie nodes can be retrieved based on hashes. -func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) } +func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) } +func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) } +func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) } -func testGetNodeData(t *testing.T, protocol uint) { +func testGetNodeData(t *testing.T, protocol uint, drop bool) { t.Parallel() // Define three accounts to simulate transactions with @@ -442,8 +448,15 @@ func testGetNodeData(t *testing.T, protocol uint) { GetNodeDataPacket: hashes, }) msg, err := peer.app.ReadMsg() - if err != nil { - t.Fatalf("failed to read node data response: %v", err) + if !drop { + if err != nil { + t.Fatalf("failed to read node data response: %v", err) + } + } else { + if err != nil { + return + } + t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg) } if msg.Code != NodeDataMsg { t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg) @@ -489,6 +502,8 @@ func testGetNodeData(t *testing.T, protocol uint) { // Tests that the transaction receipts can be retrieved based on hashes. func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) } +func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) } +func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) } func testGetBlockReceipts(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index e8e5501f74af..85a59969ebf8 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -430,13 +430,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { +func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket67) + ann := new(NewPooledTransactionHashesPacket66) if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 76166bfe3210..070ce0825f9a 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -210,19 +210,26 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { } } -// sendPooledTransactionHashes sends transaction hashes to the peer and includes +// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes // them in its transaction hash set for future reference. // // This method is a helper used by the async transaction announcer. Don't call it // directly as the queueing (memory) and transmission (bandwidth) costs should // not be managed directly. -func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { +func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket67(hashes)) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes)) } -func (p *Peer) sendPooledTransactions(hashes []common.Hash, types []byte, sizes []uint32) error { +// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type +// and size) to the peer and includes them in its transaction hash set for future +// reference. +// +// This method is a helper used by the async transaction announcer. Don't call it +// directly as the queueing (memory) and transmission (bandwidth) costs should +// not be managed directly. +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index 0916ebee5d45..efbbbc6fff88 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool()) errc := make(chan error, 1) go func() { + defer app.Close() + errc <- backend.RunPeer(peer, func(peer *Peer) error { return Handle(backend, peer) }) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index b6624872e9d1..6c59fcae655a 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -299,10 +299,10 @@ type ReceiptsRLPPacket66 struct { ReceiptsRLPPacket } -// NewPooledTransactionHashesPacket67 represents a transaction announcement packet. -type NewPooledTransactionHashesPacket67 []common.Hash +// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67. +type NewPooledTransactionHashesPacket66 []common.Hash -// NewPooledTransactionHashesPacket68 represents a transaction announcement packet over eth/68. +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. type NewPooledTransactionHashesPacket68 struct { Types []byte Sizes []uint32 @@ -372,8 +372,8 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg } func (*ReceiptsPacket) Name() string { return "Receipts" } func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } -func (*NewPooledTransactionHashesPacket67) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket67) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg } func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } From fc583202f24523388bb8365af97bec5f1aeccc2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 26 Oct 2022 15:58:01 +0300 Subject: [PATCH 7/7] eth/protocols/eth: fix announced tx size wrt type byte --- eth/protocols/eth/broadcast.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 77198bb01a74..3045303f222e 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -152,7 +152,7 @@ func (p *Peer) announceTransactions() { if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) pendingTypes = append(pendingTypes, tx.Type()) - pendingSizes = append(pendingSizes, 1+uint32(tx.Size())) // txsize += 1 because of txtype byte + pendingSizes = append(pendingSizes, uint32(tx.Size())) size += common.HashLength } }