|
| 1 | +package network |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "net/http" |
| 6 | + "sync/atomic" |
| 7 | + "testing" |
| 8 | + "time" |
| 9 | + |
| 10 | + "github.com/algorand/go-algorand/config" |
| 11 | + "github.com/algorand/go-algorand/logging" |
| 12 | + "github.com/algorand/go-algorand/network/limitcaller" |
| 13 | + p2piface "github.com/algorand/go-algorand/network/p2p" |
| 14 | + "github.com/algorand/go-algorand/protocol" |
| 15 | + "github.com/algorand/go-algorand/test/partitiontest" |
| 16 | + pubsub "github.com/libp2p/go-libp2p-pubsub" |
| 17 | + "github.com/libp2p/go-libp2p/core/network" |
| 18 | + "github.com/libp2p/go-libp2p/core/peer" |
| 19 | + "github.com/stretchr/testify/require" |
| 20 | +) |
| 21 | + |
| 22 | +// mockP2PService implements p2p.Service and counts DialPeersUntilTargetCount invocations. |
| 23 | +// It relies on p2p's meshThreadInner's defer of DialPeersUntilTargetCount to detect invocation. |
| 24 | +type mockP2PService struct{ dialCount atomic.Int32 } |
| 25 | + |
| 26 | +func (m *mockP2PService) Start() error { return nil } |
| 27 | +func (m *mockP2PService) Close() error { return nil } |
| 28 | +func (m *mockP2PService) ID() peer.ID { return "" } |
| 29 | +func (m *mockP2PService) IDSigner() *p2piface.PeerIDChallengeSigner { return nil } |
| 30 | +func (m *mockP2PService) AddrInfo() peer.AddrInfo { return peer.AddrInfo{} } |
| 31 | +func (m *mockP2PService) NetworkNotify(network.Notifiee) {} |
| 32 | +func (m *mockP2PService) NetworkStopNotify(network.Notifiee) {} |
| 33 | +func (m *mockP2PService) DialPeersUntilTargetCount(int) { m.dialCount.Add(1) } |
| 34 | +func (m *mockP2PService) ClosePeer(peer.ID) error { return nil } |
| 35 | +func (m *mockP2PService) Conns() []network.Conn { return nil } |
| 36 | +func (m *mockP2PService) ListPeersForTopic(string) []peer.ID { return nil } |
| 37 | +func (m *mockP2PService) Subscribe(string, pubsub.ValidatorEx) (p2piface.SubNextCancellable, error) { |
| 38 | + return nil, nil |
| 39 | +} |
| 40 | +func (m *mockP2PService) Publish(context.Context, string, []byte) error { return nil } |
| 41 | +func (m *mockP2PService) GetHTTPClient(*peer.AddrInfo, limitcaller.ConnectionTimeStore, time.Duration) (*http.Client, error) { |
| 42 | + return &http.Client{}, nil |
| 43 | +} |
| 44 | + |
| 45 | +// TestMesh_HybridRelayP2PInnerCall ensures the wsConnections <= targetConnCount condition |
| 46 | +// in the hybridRelayMeshCreator mesh function in order to make sure P2PNetwork.meshThreadInner is invoked |
| 47 | +func TestMesh_HybridRelayP2PInnerCall(t *testing.T) { |
| 48 | + partitiontest.PartitionTest(t) |
| 49 | + t.Parallel() |
| 50 | + |
| 51 | + cfg := config.GetDefaultLocal() |
| 52 | + cfg.GossipFanout = 0 |
| 53 | + cfg.DNSBootstrapID = "" |
| 54 | + cfg.EnableP2PHybridMode = true |
| 55 | + cfg.PublicAddress = "public-address" |
| 56 | + cfg.NetAddress = "127.0.0.1:0" |
| 57 | + cfg.P2PHybridNetAddress = "127.0.0.1:0" |
| 58 | + |
| 59 | + log := logging.TestingLog(t) |
| 60 | + genesisInfo := GenesisInfo{GenesisID: "test-genesis", NetworkID: protocol.NetworkID("test-network")} |
| 61 | + net, err := NewHybridP2PNetwork(log, cfg, "", nil, genesisInfo, &nopeNodeInfo{}, nil) |
| 62 | + require.NoError(t, err) |
| 63 | + |
| 64 | + mockSvc := &mockP2PService{} |
| 65 | + net.p2pNetwork.service = mockSvc |
| 66 | + net.p2pNetwork.relayMessages = false // prevent pubsub startup |
| 67 | + |
| 68 | + err = net.Start() |
| 69 | + require.NoError(t, err) |
| 70 | + defer net.Stop() |
| 71 | + |
| 72 | + net.RequestConnectOutgoing(false, nil) |
| 73 | + require.Eventually(t, func() bool { |
| 74 | + // RequestConnectOutgoing queues mesh update request so we have to wait a bit |
| 75 | + return mockSvc.dialCount.Load() > 0 |
| 76 | + }, 3*time.Second, 50*time.Millisecond, "expected DialPeersUntilTargetCount to be called") |
| 77 | +} |
0 commit comments