Skip to content

Commit b0d4433

Browse files
vdwijdenMariusVanDerWijdenkaralabe
authored
eth: implement eth/68 (#25980)
* eth: implement eth/68 * eth/protocols/eth: added tx size to announcement * eth/protocols/eth: check equal lengths on receiving announcement * eth/protocols/eth: add +1 to tx size because of the type byte * eth: happy lint, add eth68 tests, enable eth68 * eth: various nitpick fixes on eth/68 * eth/protocols/eth: fix announced tx size wrt type byte Co-authored-by: MariusVanDerWijden <m.vanderwijden@live.de> Co-authored-by: Péter Szilágyi <peterke@gmail.com>
1 parent 5329aa3 commit b0d4433

File tree

10 files changed

+136
-29
lines changed

10 files changed

+136
-29
lines changed

cmd/devp2p/internal/ethtest/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 }
127127
func (msg NewBlock) ReqID() uint64 { return 0 }
128128

129129
// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
130-
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket
130+
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66
131131

132132
func (msg NewPooledTransactionHashes) Code() int { return 24 }
133133
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }

eth/handler_eth.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,12 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
6767
case *eth.NewBlockPacket:
6868
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
6969

70-
case *eth.NewPooledTransactionHashesPacket:
70+
case *eth.NewPooledTransactionHashesPacket66:
7171
return h.txFetcher.Notify(peer.ID(), *packet)
7272

73+
case *eth.NewPooledTransactionHashesPacket68:
74+
return h.txFetcher.Notify(peer.ID(), packet.Hashes)
75+
7376
case *eth.TransactionsPacket:
7477
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
7578

eth/handler_eth_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
6161
h.blockBroadcasts.Send(packet.Block)
6262
return nil
6363

64-
case *eth.NewPooledTransactionHashesPacket:
64+
case *eth.NewPooledTransactionHashesPacket66:
6565
h.txAnnounces.Send(([]common.Hash)(*packet))
6666
return nil
6767

68+
case *eth.NewPooledTransactionHashesPacket68:
69+
h.txAnnounces.Send(packet.Hashes)
70+
return nil
71+
6872
case *eth.TransactionsPacket:
6973
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
7074
return nil
@@ -81,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
8185
// Tests that peers are correctly accepted (or rejected) based on the advertised
8286
// fork IDs in the protocol handshake.
8387
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
88+
func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) }
89+
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }
8490

8591
func testForkIDSplit(t *testing.T, protocol uint) {
8692
t.Parallel()
@@ -235,6 +241,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
235241

236242
// Tests that received transactions are added to the local pool.
237243
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
244+
func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) }
245+
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }
238246

239247
func testRecvTransactions(t *testing.T, protocol uint) {
240248
t.Parallel()
@@ -292,6 +300,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
292300

293301
// This test checks that pending transactions are sent.
294302
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
303+
func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) }
304+
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }
295305

296306
func testSendTransactions(t *testing.T, protocol uint) {
297307
t.Parallel()
@@ -350,7 +360,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
350360
seen := make(map[common.Hash]struct{})
351361
for len(seen) < len(insert) {
352362
switch protocol {
353-
case 66:
363+
case 66, 67, 68:
354364
select {
355365
case hashes := <-anns:
356366
for _, hash := range hashes {
@@ -377,6 +387,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
377387
// Tests that transactions get propagated to all attached peers, either via direct
378388
// broadcasts or via announcements/retrievals.
379389
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
390+
func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) }
391+
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }
380392

381393
func testTransactionPropagation(t *testing.T, protocol uint) {
382394
t.Parallel()
@@ -678,6 +690,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
678690
// Tests that a propagated malformed block (uncles or transactions don't match
679691
// with the hashes in the header) gets discarded and not broadcast forward.
680692
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
693+
func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) }
694+
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }
681695

682696
func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
683697
t.Parallel()

eth/protocols/eth/broadcast.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
142142
if done == nil && len(queue) > 0 {
143143
// Pile transaction hashes until we reach our allowed network limit
144144
var (
145-
count int
146-
pending []common.Hash
147-
size common.StorageSize
145+
count int
146+
pending []common.Hash
147+
pendingTypes []byte
148+
pendingSizes []uint32
149+
size common.StorageSize
148150
)
149151
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
150-
if p.txpool.Get(queue[count]) != nil {
152+
if tx := p.txpool.Get(queue[count]); tx != nil {
151153
pending = append(pending, queue[count])
154+
pendingTypes = append(pendingTypes, tx.Type())
155+
pendingSizes = append(pendingSizes, uint32(tx.Size()))
152156
size += common.HashLength
153157
}
154158
}
@@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
159163
if len(pending) > 0 {
160164
done = make(chan struct{})
161165
go func() {
162-
if err := p.sendPooledTransactionHashes(pending); err != nil {
163-
fail <- err
164-
return
166+
if p.version >= ETH68 {
167+
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
168+
fail <- err
169+
return
170+
}
171+
} else {
172+
if err := p.sendPooledTransactionHashes66(pending); err != nil {
173+
fail <- err
174+
return
175+
}
165176
}
166177
close(done)
167178
p.Log().Trace("Sent transaction announcements", "count", len(pending))

eth/protocols/eth/handler.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ var eth66 = map[uint64]msgHandler{
168168
NewBlockHashesMsg: handleNewBlockhashes,
169169
NewBlockMsg: handleNewBlock,
170170
TransactionsMsg: handleTransactions,
171-
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
171+
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
172172
GetBlockHeadersMsg: handleGetBlockHeaders66,
173173
BlockHeadersMsg: handleBlockHeaders66,
174174
GetBlockBodiesMsg: handleGetBlockBodies66,
@@ -185,7 +185,22 @@ var eth67 = map[uint64]msgHandler{
185185
NewBlockHashesMsg: handleNewBlockhashes,
186186
NewBlockMsg: handleNewBlock,
187187
TransactionsMsg: handleTransactions,
188-
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
188+
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
189+
GetBlockHeadersMsg: handleGetBlockHeaders66,
190+
BlockHeadersMsg: handleBlockHeaders66,
191+
GetBlockBodiesMsg: handleGetBlockBodies66,
192+
BlockBodiesMsg: handleBlockBodies66,
193+
GetReceiptsMsg: handleGetReceipts66,
194+
ReceiptsMsg: handleReceipts66,
195+
GetPooledTransactionsMsg: handleGetPooledTransactions66,
196+
PooledTransactionsMsg: handlePooledTransactions66,
197+
}
198+
199+
var eth68 = map[uint64]msgHandler{
200+
NewBlockHashesMsg: handleNewBlockhashes,
201+
NewBlockMsg: handleNewBlock,
202+
TransactionsMsg: handleTransactions,
203+
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
189204
GetBlockHeadersMsg: handleGetBlockHeaders66,
190205
BlockHeadersMsg: handleBlockHeaders66,
191206
GetBlockBodiesMsg: handleGetBlockBodies66,
@@ -210,9 +225,12 @@ func handleMessage(backend Backend, peer *Peer) error {
210225
defer msg.Discard()
211226

212227
var handlers = eth66
213-
if peer.Version() >= ETH67 {
228+
if peer.Version() == ETH67 {
214229
handlers = eth67
215230
}
231+
if peer.Version() >= ETH68 {
232+
handlers = eth68
233+
}
216234

217235
// Track the amount of time it takes to serve the request and run the handler
218236
if metrics.Enabled {

eth/protocols/eth/handler_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (b *testBackend) Handle(*Peer, Packet) error {
112112

113113
// Tests that block headers can be retrieved from a remote chain based on user queries.
114114
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
115+
func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) }
116+
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }
115117

116118
func testGetBlockHeaders(t *testing.T, protocol uint) {
117119
t.Parallel()
@@ -297,6 +299,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
297299

298300
// Tests that block contents can be retrieved from a remote chain based on their hashes.
299301
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
302+
func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) }
303+
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }
300304

301305
func testGetBlockBodies(t *testing.T, protocol uint) {
302306
t.Parallel()
@@ -379,9 +383,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
379383
}
380384

381385
// Tests that the state trie nodes can be retrieved based on hashes.
382-
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
386+
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
387+
func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) }
388+
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }
383389

384-
func testGetNodeData(t *testing.T, protocol uint) {
390+
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
385391
t.Parallel()
386392

387393
// Define three accounts to simulate transactions with
@@ -442,8 +448,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
442448
GetNodeDataPacket: hashes,
443449
})
444450
msg, err := peer.app.ReadMsg()
445-
if err != nil {
446-
t.Fatalf("failed to read node data response: %v", err)
451+
if !drop {
452+
if err != nil {
453+
t.Fatalf("failed to read node data response: %v", err)
454+
}
455+
} else {
456+
if err != nil {
457+
return
458+
}
459+
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
447460
}
448461
if msg.Code != NodeDataMsg {
449462
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
@@ -489,6 +502,8 @@ func testGetNodeData(t *testing.T, protocol uint) {
489502

490503
// Tests that the transaction receipts can be retrieved based on hashes.
491504
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
505+
func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) }
506+
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }
492507

493508
func testGetBlockReceipts(t *testing.T, protocol uint) {
494509
t.Parallel()

eth/protocols/eth/handlers.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,13 +430,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
430430
}, metadata)
431431
}
432432

433-
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
433+
func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error {
434434
// New transaction announcement arrived, make sure we have
435435
// a valid and fresh chain to handle them
436436
if !backend.AcceptTxs() {
437437
return nil
438438
}
439-
ann := new(NewPooledTransactionHashesPacket)
439+
ann := new(NewPooledTransactionHashesPacket66)
440440
if err := msg.Decode(ann); err != nil {
441441
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
442442
}
@@ -447,6 +447,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
447447
return backend.Handle(peer, ann)
448448
}
449449

450+
func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
451+
// New transaction announcement arrived, make sure we have
452+
// a valid and fresh chain to handle them
453+
if !backend.AcceptTxs() {
454+
return nil
455+
}
456+
ann := new(NewPooledTransactionHashesPacket68)
457+
if err := msg.Decode(ann); err != nil {
458+
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
459+
}
460+
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
461+
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
462+
}
463+
// Schedule all the unknown hashes for retrieval
464+
for _, hash := range ann.Hashes {
465+
peer.markTransaction(hash)
466+
}
467+
return backend.Handle(peer, ann)
468+
}
469+
450470
func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
451471
// Decode the pooled transactions retrieval message
452472
var query GetPooledTransactionsPacket66

eth/protocols/eth/peer.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
210210
}
211211
}
212212

213-
// sendPooledTransactionHashes sends transaction hashes to the peer and includes
213+
// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes
214214
// them in its transaction hash set for future reference.
215215
//
216216
// This method is a helper used by the async transaction announcer. Don't call it
217217
// directly as the queueing (memory) and transmission (bandwidth) costs should
218218
// not be managed directly.
219-
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
219+
func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error {
220220
// Mark all the transactions as known, but ensure we don't overflow our limits
221221
p.knownTxs.Add(hashes...)
222-
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
222+
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes))
223+
}
224+
225+
// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type
226+
// and size) to the peer and includes them in its transaction hash set for future
227+
// reference.
228+
//
229+
// This method is a helper used by the async transaction announcer. Don't call it
230+
// directly as the queueing (memory) and transmission (bandwidth) costs should
231+
// not be managed directly.
232+
func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
233+
// Mark all the transactions as known, but ensure we don't overflow our limits
234+
p.knownTxs.Add(hashes...)
235+
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
223236
}
224237

225238
// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually

eth/protocols/eth/peer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
4848
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
4949
errc := make(chan error, 1)
5050
go func() {
51+
defer app.Close()
52+
5153
errc <- backend.RunPeer(peer, func(peer *Peer) error {
5254
return Handle(backend, peer)
5355
})

eth/protocols/eth/protocol.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
const (
3333
ETH66 = 66
3434
ETH67 = 67
35+
ETH68 = 68
3536
)
3637

3738
// ProtocolName is the official short name of the `eth` protocol used during
@@ -40,11 +41,11 @@ const ProtocolName = "eth"
4041

4142
// ProtocolVersions are the supported versions of the `eth` protocol (first
4243
// is primary).
43-
var ProtocolVersions = []uint{ETH67, ETH66}
44+
var ProtocolVersions = []uint{ETH68, ETH67, ETH66}
4445

4546
// protocolLengths are the number of implemented message corresponding to
4647
// different protocol versions.
47-
var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17}
48+
var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17, ETH66: 17}
4849

4950
// maxMessageSize is the maximum cap on the size of a protocol message.
5051
const maxMessageSize = 10 * 1024 * 1024
@@ -298,8 +299,15 @@ type ReceiptsRLPPacket66 struct {
298299
ReceiptsRLPPacket
299300
}
300301

301-
// NewPooledTransactionHashesPacket represents a transaction announcement packet.
302-
type NewPooledTransactionHashesPacket []common.Hash
302+
// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67.
303+
type NewPooledTransactionHashesPacket66 []common.Hash
304+
305+
// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer.
306+
type NewPooledTransactionHashesPacket68 struct {
307+
Types []byte
308+
Sizes []uint32
309+
Hashes []common.Hash
310+
}
303311

304312
// GetPooledTransactionsPacket represents a transaction query.
305313
type GetPooledTransactionsPacket []common.Hash
@@ -364,8 +372,11 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg }
364372
func (*ReceiptsPacket) Name() string { return "Receipts" }
365373
func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }
366374

367-
func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" }
368-
func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg }
375+
func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" }
376+
func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg }
377+
378+
func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }
379+
func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg }
369380

370381
func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
371382
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }

0 commit comments

Comments
 (0)