diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 17903e3492..dd830ac5d7 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -197,6 +197,15 @@ type gossipSubPeer struct { func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } +func (p gossipSubPeer) OnClose(f func()) { + net := p.GetNetwork().(*P2PNetwork) + net.wsPeersLock.Lock() + defer net.wsPeersLock.Unlock() + if wsp, ok := net.wsPeers[p.peerID]; ok { + wsp.OnClose(f) + } +} + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -668,7 +677,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) { func (n *P2PNetwork) OnNetworkAdvance() { if n.nodeInfo != nil { old := n.wantTXGossip.Load() - new := n.nodeInfo.IsParticipating() + new := n.relayMessages || n.config.ForceFetchTransactions || n.nodeInfo.IsParticipating() if old != new { n.wantTXGossip.Store(new) if new { diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index a4e7975eec..4e929723f6 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -907,9 +907,9 @@ func TestP2PWantTXGossip(t *testing.T) { ctx: ctx, nodeInfo: &nopeNodeInfo{}, } - net.wantTXGossip.Store(false) // ensure wantTXGossip from false to false is noop + net.wantTXGossip.Store(false) net.OnNetworkAdvance() require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond) require.Equal(t, int64(0), mockService.count.Load()) @@ -923,14 +923,52 @@ func TestP2PWantTXGossip(t *testing.T) { require.False(t, net.wantTXGossip.Load()) // check false to true change triggers subscription + net.wantTXGossip.Store(false) net.nodeInfo = &participatingNodeInfo{} net.OnNetworkAdvance() require.Eventually(t, func() bool { return mockService.count.Load() == 1 }, 1*time.Second, 50*time.Millisecond) require.True(t, net.wantTXGossip.Load()) + // check IsParticipating changes wantTXGossip + net.wantTXGossip.Store(true) + net.nodeInfo = &nopeNodeInfo{} + net.config.ForceFetchTransactions = false + net.relayMessages = false + net.OnNetworkAdvance() + require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond) + require.False(t, net.wantTXGossip.Load()) + + // check ForceFetchTransactions and relayMessages also take effect + net.wantTXGossip.Store(false) + net.nodeInfo = &nopeNodeInfo{} + net.config.ForceFetchTransactions = true + net.relayMessages = false + net.OnNetworkAdvance() + require.Eventually(t, func() bool { return mockService.count.Load() == 2 }, 1*time.Second, 50*time.Millisecond) + require.True(t, net.wantTXGossip.Load()) + + net.wantTXGossip.Store(false) + net.nodeInfo = &nopeNodeInfo{} + net.config.ForceFetchTransactions = false + net.relayMessages = true + net.OnNetworkAdvance() + require.Eventually(t, func() bool { return mockService.count.Load() == 3 }, 1*time.Second, 50*time.Millisecond) + require.True(t, net.wantTXGossip.Load()) + + // ensure empty nodeInfo prevents changing the value + net.wantTXGossip.Store(false) + net.nodeInfo = nil + net.config.ForceFetchTransactions = true + net.relayMessages = true + net.OnNetworkAdvance() + require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond) + require.False(t, net.wantTXGossip.Load()) + // check true to true change is noop + net.wantTXGossip.Store(true) + net.nodeInfo = &participatingNodeInfo{} net.OnNetworkAdvance() - require.Eventually(t, func() bool { return mockService.count.Load() == 1 }, 1*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { return mockService.count.Load() == 3 }, 1*time.Second, 50*time.Millisecond) require.True(t, net.wantTXGossip.Load()) } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index db9be82975..038a9d6e2d 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -40,7 +40,6 @@ import ( "time" "github.com/algorand/go-algorand/internal/rapidgen" - "github.com/algorand/go-algorand/network/p2p" "github.com/algorand/go-algorand/network/phonebook" "pgregory.net/rapid" @@ -3293,14 +3292,12 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { } type participatingNodeInfo struct { + nopeNodeInfo } func (nnni *participatingNodeInfo) IsParticipating() bool { return true } -func (nnni *participatingNodeInfo) Capabilities() []p2p.Capability { - return nil -} func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { // Tests that A->B follows MOI