From 4959d39f498b3b24553a48ba321ef5d0bd10d6de Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 15 Feb 2024 13:20:18 -0500 Subject: [PATCH] add relayMessages condition and tests --- network/p2pNetwork.go | 28 ++++++---- network/p2pNetwork_test.go | 112 +++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 11 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index a58c50bc12..a3879f938f 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -68,6 +68,8 @@ type P2PNetwork struct { wsPeersChangeCounter atomic.Int32 wsPeersConnectivityCheckTicker *time.Ticker + relayMessages bool // True if we should relay messages from other nodes (nominally true for relays, false otherwise) + capabilitiesDiscovery *p2p.CapabilitiesDiscovery bootstrapper bootstrapper @@ -159,16 +161,17 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } net := &P2PNetwork{ - log: log, - config: cfg, - genesisID: genesisID, - networkID: networkID, - topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName}, - wsPeers: make(map[peer.ID]*wsPeer), - wsPeersToIDs: make(map[*wsPeer]peer.ID), - peerStats: make(map[peer.ID]*p2pPeerStats), - nodeInfo: node, - pstore: pstore, + log: log, + config: cfg, + genesisID: genesisID, + networkID: networkID, + topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName}, + wsPeers: make(map[peer.ID]*wsPeer), + wsPeersToIDs: make(map[*wsPeer]peer.ID), + peerStats: make(map[peer.ID]*p2pPeerStats), + nodeInfo: node, + pstore: pstore, + relayMessages: cfg.IsGossipServer() || cfg.ForceRelayMessages, } net.ctx, net.ctxCancel = context.WithCancel(context.Background()) net.handler = msgHandler{ @@ -401,7 +404,10 @@ func (n *P2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byt // Relay message func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { - return n.Broadcast(ctx, tag, data, wait, except) + if n.relayMessages { + return n.Broadcast(ctx, tag, data, wait, except) + } + return nil } // Disconnect from a peer, probably due to protocol errors. diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 0f139d6d31..c658633184 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -649,3 +649,115 @@ func TestP2PHTTPHandler(t *testing.T) { _, err = httpClient.Get("/test") require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout) } + +func TestP2PRelay(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + log := logging.TestingLog(t) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + err = netB.Start() + require.NoError(t, err) + defer netB.Stop() + + require.Eventually( + t, + func() bool { + return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && + len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 + }, + 2*time.Second, + 50*time.Millisecond, + ) + + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + + counter := newMessageCounter(t, 1) + counterDone := counter.done + netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) + + // send 5 messages from both netB to netA + // since there is no node with listening address set => no messages should be received + for i := 0; i < 5; i++ { + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) + require.NoError(t, err) + } + + select { + case <-counterDone: + require.Fail(t, "No messages should have been received") + case <-time.After(1 * time.Second): + } + + // add netC with listening address set, and enable relaying on netB + // ensure all messages are received by netA + cfg.NetAddress = "127.0.0.1:0" + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + err = netC.Start() + require.NoError(t, err) + defer netC.Stop() + + netB.relayMessages = true + + require.Eventually( + t, + func() bool { + return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && + len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && + len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0 + }, + 2*time.Second, + 50*time.Millisecond, + ) + + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + + const expectedMsgs = 10 + counter = newMessageCounter(t, expectedMsgs) + counterDone = counter.done + netA.ClearHandlers() + netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) + + for i := 0; i < expectedMsgs/2; i++ { + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) + require.NoError(t, err) + err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil) + require.NoError(t, err) + } + // send some duplicate messages, they should be dropped + for i := 0; i < expectedMsgs/2; i++ { + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) + require.NoError(t, err) + } + + select { + case <-counterDone: + case <-time.After(2 * time.Second): + if counter.count < expectedMsgs { + require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, counter.count) + } else if counter.count > expectedMsgs { + require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, counter.count) + } + } +}