Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: implement eth/68 to support only announcing certain tx types #25980

Merged
merged 7 commits into from
Oct 31, 2022
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 }
func (msg NewBlock) ReqID() uint64 { return 0 }

// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66

func (msg NewPooledTransactionHashes) Code() int { return 24 }
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }
Expand Down
5 changes: 4 additions & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
case *eth.NewBlockPacket:
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)

case *eth.NewPooledTransactionHashesPacket:
case *eth.NewPooledTransactionHashesPacket66:
return h.txFetcher.Notify(peer.ID(), *packet)

case *eth.NewPooledTransactionHashesPacket68:
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
return h.txFetcher.Notify(peer.ID(), packet.Hashes)

case *eth.TransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

Expand Down
18 changes: 16 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
h.blockBroadcasts.Send(packet.Block)
return nil

case *eth.NewPooledTransactionHashesPacket:
case *eth.NewPooledTransactionHashesPacket66:
h.txAnnounces.Send(([]common.Hash)(*packet))
return nil

case *eth.NewPooledTransactionHashesPacket68:
h.txAnnounces.Send(packet.Hashes)
return nil

case *eth.TransactionsPacket:
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
return nil
Expand All @@ -81,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake.
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) }
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }

func testForkIDSplit(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -235,6 +241,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {

// Tests that received transactions are added to the local pool.
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) }
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }

func testRecvTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -292,6 +300,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {

// This test checks that pending transactions are sent.
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) }
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }

func testSendTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -350,7 +360,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
seen := make(map[common.Hash]struct{})
for len(seen) < len(insert) {
switch protocol {
case 66:
case 66, 67, 68:
select {
case hashes := <-anns:
for _, hash := range hashes {
Expand All @@ -377,6 +387,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) }
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }

func testTransactionPropagation(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -678,6 +690,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Tests that a propagated malformed block (uncles or transactions don't match
// with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) }
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }

func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
25 changes: 18 additions & 7 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
if done == nil && len(queue) > 0 {
// Pile transaction hashes until we reach our allowed network limit
var (
count int
pending []common.Hash
size common.StorageSize
count int
pending []common.Hash
pendingTypes []byte
pendingSizes []uint32
size common.StorageSize
)
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
if p.txpool.Get(queue[count]) != nil {
if tx := p.txpool.Get(queue[count]); tx != nil {
pending = append(pending, queue[count])
pendingTypes = append(pendingTypes, tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Size()))
size += common.HashLength
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand All @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
if p.version >= ETH68 {
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
fail <- err
return
}
} else {
if err := p.sendPooledTransactionHashes66(pending); err != nil {
fail <- err
return
}
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
Expand Down
24 changes: 21 additions & 3 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var eth66 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
Expand All @@ -185,7 +185,22 @@ var eth67 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

var eth68 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
Expand All @@ -210,9 +225,12 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()

var handlers = eth66
if peer.Version() >= ETH67 {
if peer.Version() == ETH67 {
handlers = eth67
}
if peer.Version() >= ETH68 {
handlers = eth68
}

// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
Expand Down
23 changes: 19 additions & 4 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (b *testBackend) Handle(*Peer, Packet) error {

// Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) }
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }

func testGetBlockHeaders(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -297,6 +299,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {

// Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) }
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }

func testGetBlockBodies(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -379,9 +383,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}

// Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) }
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }

func testGetNodeData(t *testing.T, protocol uint) {
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
t.Parallel()

// Define three accounts to simulate transactions with
Expand Down Expand Up @@ -442,8 +448,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
GetNodeDataPacket: hashes,
})
msg, err := peer.app.ReadMsg()
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
if !drop {
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
}
} else {
if err != nil {
return
}
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
}
if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
Expand Down Expand Up @@ -489,6 +502,8 @@ func testGetNodeData(t *testing.T, protocol uint) {

// Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) }
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }

func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
24 changes: 22 additions & 2 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
}, metadata)
}

func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket)
ann := new(NewPooledTransactionHashesPacket66)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
Expand All @@ -447,6 +447,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return backend.Handle(peer, ann)
}

func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket68)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
}
// Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes {
peer.markTransaction(hash)
}
return backend.Handle(peer, ann)
}

func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66
Expand Down
19 changes: 16 additions & 3 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
}
}

// sendPooledTransactionHashes sends transaction hashes to the peer and includes
// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes
// them in its transaction hash set for future reference.
//
// This method is a helper used by the async transaction announcer. Don't call it
// directly as the queueing (memory) and transmission (bandwidth) costs should
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes))
}

// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type
// and size) to the peer and includes them in its transaction hash set for future
// reference.
//
// This method is a helper used by the async transaction announcer. Don't call it
// directly as the queueing (memory) and transmission (bandwidth) costs should
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
}

// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
Expand Down
2 changes: 2 additions & 0 deletions eth/protocols/eth/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
errc := make(chan error, 1)
go func() {
defer app.Close()

errc <- backend.RunPeer(peer, func(peer *Peer) error {
return Handle(backend, peer)
})
Expand Down
23 changes: 17 additions & 6 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const (
ETH66 = 66
ETH67 = 67
ETH68 = 68
)

// ProtocolName is the official short name of the `eth` protocol used during
Expand All @@ -40,11 +41,11 @@ const ProtocolName = "eth"

// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH67, ETH66}
var ProtocolVersions = []uint{ETH68, ETH67, ETH66}

// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17}
var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17, ETH66: 17}

// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
Expand Down Expand Up @@ -298,8 +299,15 @@ type ReceiptsRLPPacket66 struct {
ReceiptsRLPPacket
}

// NewPooledTransactionHashesPacket represents a transaction announcement packet.
type NewPooledTransactionHashesPacket []common.Hash
// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67.
type NewPooledTransactionHashesPacket66 []common.Hash

// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer.
type NewPooledTransactionHashesPacket68 struct {
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
Types []byte
Sizes []uint32
Hashes []common.Hash
}

// GetPooledTransactionsPacket represents a transaction query.
type GetPooledTransactionsPacket []common.Hash
Expand Down Expand Up @@ -364,8 +372,11 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg }
func (*ReceiptsPacket) Name() string { return "Receipts" }
func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }

func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" }
func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg }
func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" }
func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg }

func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }
func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg }

func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }
Expand Down