diff --git a/network/p2p/http.go b/network/p2p/http.go index ea9d86a51c..f87bf456b9 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -88,9 +88,15 @@ func WithHost(h host.Host) httpClientOption { } } -// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address. +// MakeTestHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address. +// This exported method is only used in tests. +func MakeTestHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) { + return makeHTTPClient(addrInfo, opts...) +} + +// makeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address. // If service is nil, a new libp2p host is created. -func MakeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) { +func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) { var config httpClientConfig for _, opt := range opts { opt(&config) @@ -122,13 +128,13 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Cl return &http.Client{Transport: rt}, nil } -// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address. -func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, service Service, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) { - cl, err := MakeHTTPClient(addrInfo, WithHost(service.(*serviceImpl).host)) +// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address. +func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) { + cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host)) if err != nil { return nil, err } - rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID)) + rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(connTimeStore, queueingTimeout, cl.Transport, string(addrInfo.ID)) cl.Transport = &rltr return cl, nil diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index f281d1d13b..0e46b8cb0f 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -21,12 +21,14 @@ import ( "encoding/base32" "fmt" "net" + "net/http" "runtime" "strings" "time" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/network/limitcaller" pstore "github.com/algorand/go-algorand/network/p2p/peerstore" "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/util/metrics" @@ -69,6 +71,9 @@ type Service interface { ListPeersForTopic(topic string) []peer.ID Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) Publish(ctx context.Context, topic string, data []byte) error + + // GetHTTPClient returns a rate-limiting libp2p-streaming http client that can be used to make requests to the given peer + GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) } // serviceImpl manages integration with libp2p and implements the Service interface @@ -412,3 +417,8 @@ func addressFilter(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { } return res } + +// GetHTTPClient returns a libp2p-streaming http client that can be used to make requests to the given peer +func (s *serviceImpl) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) { + return makeHTTPClientWithRateLimit(addrInfo, s, connTimeStore, queueingTimeout) +} diff --git a/network/p2p/testing/httpNode.go b/network/p2p/testing/httpNode.go index 523cdc5d4c..f73b26999f 100644 --- a/network/p2p/testing/httpNode.go +++ b/network/p2p/testing/httpNode.go @@ -104,7 +104,7 @@ func (p httpPeer) GetAddress() string { // GetAddress implements HTTPPeer interface and returns the http client for a peer func (p httpPeer) GetHTTPClient() *http.Client { - c, err := p2p.MakeHTTPClient(&p.addrInfo) + c, err := p2p.MakeTestHTTPClient(&p.addrInfo) require.NoError(p.tb, err) return c } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f6a1937c00..c145e08079 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -612,7 +612,7 @@ func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, b } addr := mas[0].String() - client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout) + client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout) if err != nil { n.log.Warnf("MakeHTTPClient failed: %v", err) return wsPeerCore{}, false @@ -718,7 +718,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) { if err != nil { return nil, err } - return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout) + return n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout) } // OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress. @@ -771,7 +771,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea // create a wsPeer for this stream and added it to the peers map. addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}} - client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout) + client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout) if err != nil { n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err) client = nil diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index e5de54cc99..47bd22197f 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -368,6 +368,10 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er return nil } +func (s *mockService) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) { + return nil, nil +} + func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService { return &mockService{ id: id, @@ -757,7 +761,7 @@ func TestP2PHTTPHandler(t *testing.T) { require.NoError(t, err) require.NotZero(t, addrsA[0]) - httpClient, err := p2p.MakeHTTPClient(&peerInfoA) + httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA) require.NoError(t, err) resp, err := httpClient.Get("/test") require.NoError(t, err) @@ -768,7 +772,7 @@ func TestP2PHTTPHandler(t *testing.T) { require.Equal(t, "hello", string(body)) // check another endpoint that also access the underlying connection/stream - httpClient, err = p2p.MakeHTTPClient(&peerInfoA) + httpClient, err = p2p.MakeTestHTTPClient(&peerInfoA) require.NoError(t, err) resp, err = httpClient.Get("/check-conn") require.NoError(t, err) @@ -785,7 +789,7 @@ func TestP2PHTTPHandler(t *testing.T) { pstore, err := peerstore.MakePhonebook(0, 10*time.Second) require.NoError(t, err) pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole) - httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, netB.service, pstore, 1*time.Second) + httpClient, err = netB.service.GetHTTPClient(&peerInfoA, pstore, 1*time.Second) require.NoError(t, err) _, err = httpClient.Get("/test") require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout) @@ -817,7 +821,7 @@ func TestP2PHTTPHandlerAllInterfaces(t *testing.T) { require.NotZero(t, addrsB[0]) t.Logf("peerInfoB: %s", peerInfoA) - httpClient, err := p2p.MakeHTTPClient(&peerInfoA) + httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA) require.NoError(t, err) resp, err := httpClient.Get("/test") require.NoError(t, err)