Skip to content

Commit

Permalink
fix(p2p): exchange protocol ID built using protocolSuffix string
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Oct 24, 2022
1 parent 270f229 commit 4d4b96c
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
21 changes: 12 additions & 9 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

var log = logging.Logger("header/p2p")
Expand All @@ -38,19 +37,24 @@ const (
// gossipsub topic.
const PubSubTopic = "header-sub"

var exchangeProtocolID = protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", p2p.DefaultNetwork))

// Exchange enables sending outbound ExtendedHeaderRequests to the network as well as
// handling inbound ExtendedHeaderRequests from the network.
type Exchange struct {
protocolID protocol.ID

host host.Host

trustedPeers peer.IDSlice
}

func NewExchange(host host.Host, peers peer.IDSlice) *Exchange {
func protocolID(protocolSuffix string) protocol.ID {
return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix))
}

func NewExchange(host host.Host, peers peer.IDSlice, protocolSuffix string) *Exchange {
return &Exchange{
host: host,
protocolID: protocolID(protocolSuffix),
trustedPeers: peers,
}
}
Expand All @@ -69,7 +73,7 @@ func (ex *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) {
// request head from each trusted peer
for _, from := range ex.trustedPeers {
go func(from peer.ID) {
headers, err := request(ctx, from, ex.host, req)
headers, err := ex.request(ctx, from, req)
if err != nil {
log.Errorw("head request to trusted peer failed", "trustedPeer", from, "err", err)
headerCh <- nil
Expand Down Expand Up @@ -162,17 +166,16 @@ func (ex *Exchange) performRequest(

//nolint:gosec // G404: Use of weak random number generator
index := rand.Intn(len(ex.trustedPeers))
return request(ctx, ex.trustedPeers[index], ex.host, req)
return ex.request(ctx, ex.trustedPeers[index], req)
}

// request sends the ExtendedHeaderRequest to a remote peer.
func request(
func (ex *Exchange) request(
ctx context.Context,
to peer.ID,
host host.Host,
req *p2p_pb.ExtendedHeaderRequest,
) ([]*header.ExtendedHeader, error) {
stream, err := host.NewStream(ctx, to, exchangeProtocolID)
stream, err := ex.host.NewStream(ctx, to, ex.protocolID)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions header/p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ func TestExchange_RequestByHash(t *testing.T) {
host, peer := net.Hosts()[0], net.Hosts()[1]
// create and start the ExchangeServer
store := createStore(t, 5)
serv := NewExchangeServer(host, store)
serv := NewExchangeServer(host, store, "private")
err = serv.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
serv.Stop(context.Background()) //nolint:errcheck
})

// start a new stream via Peer to see if Host can handle inbound requests
stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, exchangeProtocolID)
stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, protocolID("private"))
require.NoError(t, err)
// create request for a header at a random height
reqHeight := store.headHeight - 2
Expand Down Expand Up @@ -205,14 +205,14 @@ func TestExchange_RequestByHashFails(t *testing.T) {
require.NoError(t, err)
// get host and peer
host, peer := net.Hosts()[0], net.Hosts()[1]
serv := NewExchangeServer(host, createStore(t, 0))
serv := NewExchangeServer(host, createStore(t, 0), "private")
err = serv.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
serv.Stop(context.Background()) //nolint:errcheck
})

stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, exchangeProtocolID)
stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, protocolID("private"))
require.NoError(t, err)
req := &p2p_pb.ExtendedHeaderRequest{
Data: &p2p_pb.ExtendedHeaderRequest_Hash{Hash: []byte("dummy_hash")},
Expand All @@ -238,15 +238,15 @@ func createMocknet(t *testing.T) (libhost.Host, libhost.Host) {
// createP2PExAndServer creates a Exchange with 5 headers already in its store.
func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchange, *mockStore) {
store := createStore(t, 5)
serverSideEx := NewExchangeServer(tpeer, store)
serverSideEx := NewExchangeServer(tpeer, store, "private")
err := serverSideEx.Start(context.Background())
require.NoError(t, err)

t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})

return NewExchange(host, []peer.ID{tpeer.ID()}), store
return NewExchange(host, []peer.ID{tpeer.ID()}, "private"), store
}

type mockStore struct {
Expand Down
14 changes: 9 additions & 5 deletions header/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/go-libp2p-messenger/serde"
Expand All @@ -17,6 +18,8 @@ import (
// ExchangeServer represents the server-side component for
// responding to inbound header-related requests.
type ExchangeServer struct {
protocolID protocol.ID

host host.Host
store header.Store

Expand All @@ -26,10 +29,11 @@ type ExchangeServer struct {

// NewExchangeServer returns a new P2P server that handles inbound
// header-related requests.
func NewExchangeServer(host host.Host, store header.Store) *ExchangeServer {
func NewExchangeServer(host host.Host, store header.Store, protocolSuffix string) *ExchangeServer {
return &ExchangeServer{
host: host,
store: store,
protocolID: protocolID(protocolSuffix),
host: host,
store: store,
}
}

Expand All @@ -38,7 +42,7 @@ func (serv *ExchangeServer) Start(context.Context) error {
serv.ctx, serv.cancel = context.WithCancel(context.Background())
log.Info("server: listening for inbound header requests")

serv.host.SetStreamHandler(exchangeProtocolID, serv.requestHandler)
serv.host.SetStreamHandler(serv.protocolID, serv.requestHandler)

return nil
}
Expand All @@ -47,7 +51,7 @@ func (serv *ExchangeServer) Start(context.Context) error {
func (serv *ExchangeServer) Stop(context.Context) error {
log.Info("server: stopping server")
serv.cancel()
serv.host.RemoveStreamHandler(exchangeProtocolID)
serv.host.RemoveStreamHandler(serv.protocolID)
return nil
}

Expand Down
13 changes: 9 additions & 4 deletions nodebuilder/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ import (
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// newP2PExchange constructs new Exchange for headers.
func newP2PExchange(cfg Config) func(modp2p.Bootstrappers, host.Host) (header.Exchange, error) {
return func(bpeers modp2p.Bootstrappers, host host.Host) (header.Exchange, error) {
// newP2PServer constructs a new ExchangeServer using the given Network as a protocolID suffix.
func newP2PServer(host host.Host, store header.Store, network modp2p.Network) *p2p.ExchangeServer {
return p2p.NewExchangeServer(host, store, string(network))
}

// newP2PExchange constructs a new Exchange for headers.
func newP2PExchange(cfg Config) func(modp2p.Bootstrappers, modp2p.Network, host.Host) (header.Exchange, error) {
return func(bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host) (header.Exchange, error) {
peers, err := cfg.trustedPeers(bpeers)
if err != nil {
return nil, err
Expand All @@ -28,7 +33,7 @@ func newP2PExchange(cfg Config) func(modp2p.Bootstrappers, host.Host) (header.Ex
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
return p2p.NewExchange(host, ids), nil
return p2p.NewExchange(host, ids, string(network)), nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}),
)),
fx.Provide(fx.Annotate(
p2p.NewExchangeServer,
newP2PServer,
fx.OnStart(func(ctx context.Context, server *p2p.ExchangeServer) error {
return server.Start(ctx)
}),
Expand Down

0 comments on commit 4d4b96c

Please sign in to comment.