Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: use http.ResponseController instead of GetHTTPRequestConnection #6044

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee
}
func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) {
return nil
}

func (w *whiteholeNetwork) Start() error {
w.quit = make(chan struct{})
Expand Down
5 changes: 0 additions & 5 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handle
// OnNetworkAdvance - empty implementation
func (network *MockNetwork) OnNetworkAdvance() {}

// GetHTTPRequestConnection - empty implementation
func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) {
return nil
}

// GetGenesisID - empty implementation
func (network *MockNetwork) GetGenesisID() string {
if network.GenesisID == "" {
Expand Down
12 changes: 0 additions & 12 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"net/http"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -50,13 +49,6 @@ const (
PeersPhonebookArchivalNodes PeerOption = iota
)

// DeadlineSettableConn abstracts net.Conn and related types as deadline-settable
type DeadlineSettableConn interface {
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

// GossipNode represents a node in the gossip network
type GossipNode interface {
Address() (string, bool)
Expand Down Expand Up @@ -104,10 +96,6 @@ type GossipNode interface {
// characteristics as with a watchdog timer.
OnNetworkAdvance()

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn)

// GetGenesisID returns the network-specific genesisID.
GetGenesisID() string

Expand Down
10 changes: 0 additions & 10 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,6 @@ func (n *HybridP2PNetwork) OnNetworkAdvance() {
})
}

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
func (n *HybridP2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) {
conn = n.wsNetwork.GetHTTPRequestConnection(request)
if conn != nil {
return conn
}
return n.p2pNetwork.GetHTTPRequestConnection(request)
}

// GetGenesisID returns the network-specific genesisID.
func (n *HybridP2PNetwork) GetGenesisID() string {
return n.genesisID
Expand Down
50 changes: 1 addition & 49 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
Expand Down Expand Up @@ -67,8 +66,6 @@ type Service interface {
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

GetStream(peer.ID) (network.Stream, bool)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -137,47 +134,7 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host.
libp2p.Security(noise.ID, noise.New),
disableMetrics,
)
return &StreamChainingHost{
Host: host,
handlers: map[protocol.ID][]network.StreamHandler{},
}, listenAddr, err
}

// StreamChainingHost is a wrapper around host.Host that overrides SetStreamHandler
// to allow chaining multiple handlers for the same protocol.
// Note, there should be probably only single handler that writes/reads streams.
type StreamChainingHost struct {
host.Host
handlers map[protocol.ID][]network.StreamHandler
mutex deadlock.Mutex
}

// SetStreamHandler overrides the host.Host.SetStreamHandler method for chaining multiple handlers.
// Function objects are not comparable so theoretically it could have duplicates.
// The main use case is to track HTTP streams for ProtocolIDForMultistreamSelect = "/http/1.1"
// so it could just filter for such protocol if there any issues with other protocols like kad or mesh.
func (h *StreamChainingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
h.mutex.Lock()
defer h.mutex.Unlock()

handlers := h.handlers[pid]
if len(handlers) == 0 {
// no other handlers, do not set a proxy handler
h.Host.SetStreamHandler(pid, handler)
h.handlers[pid] = append(handlers, handler)
return
}
// otherwise chain the handlers with a copy of the existing handlers
handlers = append(handlers, handler)
// copy to save it in the closure and call lock free
currentHandlers := make([]network.StreamHandler, len(handlers))
copy(currentHandlers, handlers)
h.Host.SetStreamHandler(pid, func(s network.Stream) {
for _, h := range currentHandlers {
h(s)
}
})
h.handlers[pid] = handlers
return host, listenAddr, err
}

// MakeService creates a P2P service instance
Expand All @@ -186,7 +143,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
sm := makeStreamManager(ctx, log, h, wsStreamHandler)
h.Network().Notify(sm)
h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler)
h.SetStreamHandler(libp2phttp.ProtocolIDForMultistreamSelect, sm.streamHandlerHTTP)

// set an empty handler for telemetryID/telemetryInstance protocol in order to allow other peers to know our telemetryID
telemetryID := log.GetTelemetryGUID()
Expand Down Expand Up @@ -294,10 +250,6 @@ func (s *serviceImpl) ClosePeer(peer peer.ID) error {
return s.host.Network().ClosePeer(peer)
}

func (s *serviceImpl) GetStream(peerID peer.ID) (network.Stream, bool) {
return s.streams.getStream(peerID)
}

// netAddressToListenAddress converts a netAddress in "ip:port" format to a listen address
// that can be passed in to libp2p.ListenAddrStrings
func netAddressToListenAddress(netAddress string) (string, error) {
Expand Down
66 changes: 0 additions & 66 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ package p2p
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand Down Expand Up @@ -86,67 +81,6 @@ func TestNetAddressToListenAddress(t *testing.T) {
}
}

func TestP2PStreamingHost(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
dir := t.TempDir()
pstore, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, la, err := MakeHost(cfg, dir, pstore)
require.NoError(t, err)

var h1calls atomic.Int64
h1 := func(network.Stream) {
h1calls.Add(1)
}
var h2calls atomic.Int64
h2 := func(network.Stream) {
h2calls.Add(1)
}

ma, err := multiaddr.NewMultiaddr(la)
require.NoError(t, err)
h.Network().Listen(ma)
defer h.Close()

h.SetStreamHandler(AlgorandWsProtocol, h1)
h.SetStreamHandler(AlgorandWsProtocol, h2)

addrInfo := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo}, "")
require.NoError(t, err)
c, _, err := MakeHost(cfg, dir, cpstore)
require.NoError(t, err)
defer c.Close()

s1, err := c.NewStream(context.Background(), h.ID(), AlgorandWsProtocol)
require.NoError(t, err)
s1.Write([]byte("hello"))
defer s1.Close()

require.Eventually(t, func() bool {
return h1calls.Load() == 1 && h2calls.Load() == 1
}, 5*time.Second, 100*time.Millisecond)

// ensure a single handler also works as expected
h1calls.Store(0)
h.SetStreamHandler(algorandP2pHTTPProtocol, h1)

s2, err := c.NewStream(context.Background(), h.ID(), algorandP2pHTTPProtocol)
require.NoError(t, err)
s2.Write([]byte("hello"))
defer s2.Close()

require.Eventually(t, func() bool {
return h1calls.Load() == 1
}, 5*time.Second, 100*time.Millisecond)

}

// TestP2PGetPeerTelemetryInfo tests the GetPeerTelemetryInfo function
func TestP2PGetPeerTelemetryInfo(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down
20 changes: 0 additions & 20 deletions network/p2p/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ func (n *streamManager) streamHandler(stream network.Stream) {
n.handler(n.ctx, remotePeer, stream, incoming)
}

// streamHandlerHTTP tracks the ProtocolIDForMultistreamSelect = "/http/1.1" streams
func (n *streamManager) streamHandlerHTTP(stream network.Stream) {
n.streamsLock.Lock()
defer n.streamsLock.Unlock()
n.streams[stream.Conn().LocalPeer()] = stream
}

func (n *streamManager) getStream(peerID peer.ID) (network.Stream, bool) {
n.streamsLock.Lock()
defer n.streamsLock.Unlock()
stream, ok := n.streams[peerID]
return stream, ok
}

// Connected is called when a connection is opened
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
Expand Down Expand Up @@ -174,12 +160,6 @@ func (n *streamManager) Disconnected(net network.Network, conn network.Conn) {
stream.Close()
delete(n.streams, conn.RemotePeer())
}

stream, ok = n.streams[conn.LocalPeer()]
if ok {
stream.Close()
delete(n.streams, conn.LocalPeer())
}
}

// Listen is called when network starts listening on an addr
Expand Down
18 changes: 0 additions & 18 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package network
import (
"context"
"math/rand"
"net"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -725,23 +724,6 @@ func (n *P2PNetwork) OnNetworkAdvance() {
}
}

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
func (n *P2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) {
addr := request.Context().Value(http.LocalAddrContextKey).(net.Addr)
peerID, err := peer.Decode(addr.String())
if err != nil {
n.log.Infof("GetHTTPRequestConnection failed to decode %s", addr.String())
return nil
}
conn, ok := n.service.GetStream(peerID)
if !ok {
n.log.Warnf("GetHTTPRequestConnection no such stream for peer %s", peerID.String())
return nil
}
return conn
}

// wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a
// stream for the websocket protocol.
func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) {
Expand Down
9 changes: 3 additions & 6 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,6 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er
return nil
}

func (s *mockService) GetStream(peer.ID) (network.Stream, bool) {
return nil, false
}

func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService {
return &mockService{
id: id,
Expand Down Expand Up @@ -725,8 +721,9 @@ type p2phttpHandler struct {
func (h *p2phttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(h.retData))
if r.URL.Path == "/check-conn" {
c := h.net.GetHTTPRequestConnection(r)
require.NotNil(h.tb, c)
rc := http.NewResponseController(w)
err := rc.SetWriteDeadline(time.Now().Add(10 * time.Second))
require.NoError(h.tb, err)
}
}

Expand Down
Loading
Loading