Skip to content

Commit

Permalink
add relayMessages condition and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Feb 15, 2024
1 parent 773a8ed commit 4959d39
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
28 changes: 17 additions & 11 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type P2PNetwork struct {
wsPeersChangeCounter atomic.Int32
wsPeersConnectivityCheckTicker *time.Ticker

relayMessages bool // True if we should relay messages from other nodes (nominally true for relays, false otherwise)

capabilitiesDiscovery *p2p.CapabilitiesDiscovery

bootstrapper bootstrapper
Expand Down Expand Up @@ -159,16 +161,17 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}

net := &P2PNetwork{
log: log,
config: cfg,
genesisID: genesisID,
networkID: networkID,
topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName},
wsPeers: make(map[peer.ID]*wsPeer),
wsPeersToIDs: make(map[*wsPeer]peer.ID),
peerStats: make(map[peer.ID]*p2pPeerStats),
nodeInfo: node,
pstore: pstore,
log: log,
config: cfg,
genesisID: genesisID,
networkID: networkID,
topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName},
wsPeers: make(map[peer.ID]*wsPeer),
wsPeersToIDs: make(map[*wsPeer]peer.ID),
peerStats: make(map[peer.ID]*p2pPeerStats),
nodeInfo: node,
pstore: pstore,
relayMessages: cfg.IsGossipServer() || cfg.ForceRelayMessages,
}
net.ctx, net.ctxCancel = context.WithCancel(context.Background())
net.handler = msgHandler{
Expand Down Expand Up @@ -401,7 +404,10 @@ func (n *P2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byt

// Relay message
func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
return n.Broadcast(ctx, tag, data, wait, except)
if n.relayMessages {
return n.Broadcast(ctx, tag, data, wait, except)
}
return nil
}

// Disconnect from a peer, probably due to protocol errors.
Expand Down
112 changes: 112 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,115 @@ func TestP2PHTTPHandler(t *testing.T) {
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
}

func TestP2PRelay(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)

err = netA.Start()
require.NoError(t, err)
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])

multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netB.Start()
require.NoError(t, err)
defer netB.Stop()

require.Eventually(
t,
func() bool {
return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0
},
2*time.Second,
50*time.Millisecond,
)

require.Eventually(t, func() bool {
return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0
}, 2*time.Second, 50*time.Millisecond)

counter := newMessageCounter(t, 1)
counterDone := counter.done
netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

// send 5 messages from both netB to netA
// since there is no node with listening address set => no messages should be received
for i := 0; i < 5; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
require.Fail(t, "No messages should have been received")
case <-time.After(1 * time.Second):
}

// add netC with listening address set, and enable relaying on netB
// ensure all messages are received by netA
cfg.NetAddress = "127.0.0.1:0"
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netC.Start()
require.NoError(t, err)
defer netC.Stop()

netB.relayMessages = true

require.Eventually(
t,
func() bool {
return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0
},
2*time.Second,
50*time.Millisecond,
)

require.Eventually(t, func() bool {
return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0
}, 2*time.Second, 50*time.Millisecond)

const expectedMsgs = 10
counter = newMessageCounter(t, expectedMsgs)
counterDone = counter.done
netA.ClearHandlers()
netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
require.NoError(t, err)
err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil)
require.NoError(t, err)
}
// send some duplicate messages, they should be dropped
for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
if counter.count < expectedMsgs {
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, counter.count)
} else if counter.count > expectedMsgs {
require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, counter.count)
}
}
}

0 comments on commit 4959d39

Please sign in to comment.