Skip to content

Commit

Permalink
add GetHTTPClient to p2p.Service interface
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Sep 13, 2024
1 parent 13f21a9 commit 35c3ec5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
18 changes: 12 additions & 6 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,15 @@ func WithHost(h host.Host) httpClientOption {
}
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// MakeTestHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// This exported method is only used in tests.
func MakeTestHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
return makeHTTPClient(addrInfo, opts...)

Check warning on line 94 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}

// makeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// If service is nil, a new libp2p host is created.
func MakeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
var config httpClientConfig
for _, opt := range opts {
opt(&config)

Check warning on line 102 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L99-L102

Added lines #L99 - L102 were not covered by tests
Expand Down Expand Up @@ -122,13 +128,13 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Cl
return &http.Client{Transport: rt}, nil
}

// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, service Service, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := MakeHTTPClient(addrInfo, WithHost(service.(*serviceImpl).host))
// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))

Check warning on line 133 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L132-L133

Added lines #L132 - L133 were not covered by tests
if err != nil {
return nil, err
}
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID))
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(connTimeStore, queueingTimeout, cl.Transport, string(addrInfo.ID))

Check warning on line 137 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L137

Added line #L137 was not covered by tests
cl.Transport = &rltr
return cl, nil

Expand Down
10 changes: 10 additions & 0 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"encoding/base32"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -69,6 +71,9 @@ type Service interface {
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

// GetHTTPClient returns a rate-limiting libp2p-streaming http client that can be used to make requests to the given peer
GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -412,3 +417,8 @@ func addressFilter(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
}
return res
}

// GetHTTPClient returns a libp2p-streaming http client that can be used to make requests to the given peer
func (s *serviceImpl) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return makeHTTPClientWithRateLimit(addrInfo, s, connTimeStore, queueingTimeout)

Check warning on line 423 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L422-L423

Added lines #L422 - L423 were not covered by tests
}
2 changes: 1 addition & 1 deletion network/p2p/testing/httpNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p httpPeer) GetAddress() string {

// GetAddress implements HTTPPeer interface and returns the http client for a peer
func (p httpPeer) GetHTTPClient() *http.Client {
c, err := p2p.MakeHTTPClient(&p.addrInfo)
c, err := p2p.MakeTestHTTPClient(&p.addrInfo)
require.NoError(p.tb, err)
return c
}
Expand Down
6 changes: 3 additions & 3 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, b
}
addr := mas[0].String()

client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("MakeHTTPClient failed: %v", err)
return wsPeerCore{}, false
Expand Down Expand Up @@ -718,7 +718,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
if err != nil {
return nil, err
}
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout)
return n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)

Check warning on line 721 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L721

Added line #L721 was not covered by tests
}

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
Expand Down Expand Up @@ -771,7 +771,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea

// create a wsPeer for this stream and added it to the peers map.
addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}}
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.service, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err)
client = nil
Expand Down
12 changes: 8 additions & 4 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er
return nil
}

func (s *mockService) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return nil, nil
}

func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService {
return &mockService{
id: id,
Expand Down Expand Up @@ -757,7 +761,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.NoError(t, err)
require.NotZero(t, addrsA[0])

httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand All @@ -768,7 +772,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.Equal(t, "hello", string(body))

// check another endpoint that also access the underlying connection/stream
httpClient, err = p2p.MakeHTTPClient(&peerInfoA)
httpClient, err = p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err = httpClient.Get("/check-conn")
require.NoError(t, err)
Expand All @@ -785,7 +789,7 @@ func TestP2PHTTPHandler(t *testing.T) {
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, netB.service, pstore, 1*time.Second)
httpClient, err = netB.service.GetHTTPClient(&peerInfoA, pstore, 1*time.Second)
require.NoError(t, err)
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
Expand Down Expand Up @@ -817,7 +821,7 @@ func TestP2PHTTPHandlerAllInterfaces(t *testing.T) {
require.NotZero(t, addrsB[0])

t.Logf("peerInfoB: %s", peerInfoA)
httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand Down

0 comments on commit 35c3ec5

Please sign in to comment.